IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于
所有文章 友链

IT小栈

  • 主页
  • Java基础
  • RocketMQ
  • Kafka
  • Redis
  • Shiro
  • Spring
  • Spring Boot
  • Spring Cloud
  • 资料链接
  • 关于

Kafka认证授权

2020-04-03

kafka作为一款优秀的消息中间件,安全必然是很重要的一个特性,很多应用场景下我们需要控制生产、消费端、乃至IP进行相关的控制,操作粒度达到主题级别的消息的相关的操作,本节我们将重点介绍kafka的认证和授权。

1、kafka认证

Kafka 认证机制

自 0.9.0.0 版本开始,Kafka 正式引入了认证机制,用于实现基础的安全用户认证,这是将 Kafka 上云或进行多租户管理的必要步骤。截止到当前最新的 2.3 版本,Kafka 支持基于 SSL 和基于 SASL 的安全认证机制。

基于 SSL 的认证主要是指 Broker 和客户端的双路认证(2-way authentication)。通常来说,SSL 加密(Encryption)已经启用了单向认证,即客户端认证 Broker 的证书(Certificate)。如果要做 SSL 认证,那么我们要启用双路认证,也就是说 Broker 也要认证客户端的证书。

对了,你可能会说,SSL 不是已经过时了吗?现在都叫 TLS(Transport Layer Security)了吧?但是,Kafka 的源码中依然是使用 SSL 而不是 TLS 来表示这类东西的。不过,今天出现的所有 SSL 字眼,你都可以认为它们是和 TLS 等价的。

Kafka 还支持通过 SASL 做客户端认证。SASL 是提供认证和数据安全服务的框架。Kafka 支持的 SASL 机制有 5 种,它们分别是在不同版本中被引入的,你需要根据你自己使用的 Kafka 版本,来选择该版本所支持的认证机制。

  1. GSSAPI:也就是 Kerberos 使用的安全接口,是在 0.9 版本中被引入的。
  2. PLAIN:是使用简单的用户名 / 密码认证的机制,在 0.10 版本中被引入。
  3. SCRAM:主要用于解决 PLAIN 机制安全问题的新机制,是在 0.10.2 版本中被引入的。
  4. OAUTHBEARER:是基于 OAuth 2 认证框架的新机制,在 2.0 版本中被引进。
  5. Token:补充现有 SASL 机制的轻量级认证机制,是在 1.1.0 版本被引入的。认证机制的比较

认证机制的比较

SASL/SCRAM认证机制支持动态增减认证用户,我们就以 SASL/SCRAM 的一个配置实例,来说明一下如何在 Kafka 集群中开启认证。其他认证机制的设置方法也是类似的,比如它们都涉及认证用户的创建、Broker 端以及 Client 端特定参数的配置等。

SASL/SCRAM认证

环境描述

主机/IP 部署服务
10.10.15.235 Kafka2.3、zookeeper3.5.5
10.10.15.236 Kafka2.3、zookeeper3.5.5
10.10.15.237 Kafka2.3、zookeeper3.5.5

1、创建用户

创建broker建通信用户(或称超级用户)

1
bin/kafka-configs.sh --zookeeper 10.10.15.235:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

创建客户端用户gumx

1
bin/kafka-configs.sh --zookeeper 10.10.15.235:2181 --alter --add-config 'SCRAM-SHA-256=[password=123456],SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name gumx

查看SCRAM证书

1
bin/kafka-configs.sh --zookeeper 10.10.15.235:2181 --describe --entity-type users --entity-name gumx

2、配置Kafka Brokers

创建 JAAS 文件

1
2
[root@localhost config]# cd /opt/kafka/config
[root@localhost config]# vi kafka_server_jaas.conf

文件内容

1
2
3
4
5
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};

将JAAS配置文件位置作为JVM参数传递给每个Kafka broker:
修改 $KAFKA-HOME/bin/kafka-server-start.sh
将exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka “$@” 注释,最后一行, 增加下面的内容

1
2
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"

配置 Broker 的 server.properties 文件

1
2
3
4
5
6
7
8
9
# 认证配置
listeners=SASL_PLAINTEXT://10.10.15.236:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
# ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

第 1 项设置 listeners 使用 SASL_PLAINTEXT,依然是不使用 SSL

第 2 项表示 Broker 间通信不配置 SSL,本例中我们不演示 SSL 的配置;

第 3 项的意思是为 Broker 间通信也开启 SCRAM 认证,同样使用 SHA-256 算法;

第 4 项内容表明开启 SCRAM 认证机制,并启用 SHA-256 算法;

3、重启 ZK/Broker

停止ZK/停止Broker

1
2
/usr/local/zookeeper/bin/zkServer.sh stop
bin/kafka-server-stop.sh

启动ZK/启动Broker

1
2
/usr/local/zookeeper/bin/zkServer.sh start
nohup bin/kafka-server-start.sh config/server.properties >kafka_start.log 2>&1 &

4、客户端配置

先使用kafka-console-producer 和 kafka-console-consumer 测试一下

kafka-console-producer

1、创建 config/client-sasl.properties 文件

1
2
3
[root@localhost kafka]# vim config/client-sasl.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256

2、创建config/kafka_client_jaas_admin.conf文件

1
2
3
4
5
6
[root@localhost kafka]# vim config/kafka_client_jaas_admin.conf 
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};

3、修改kafka-console-producer.sh脚本,这里我一份,再改

1
2
3
4
cp bin/kafka-console-producer.sh bin/kafka-console-producer-admin.sh
vim bin/kafka-console-producer-admin.sh
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_admin.conf kafka.tools.ConsoleProducer "$@"

4、创建测试topic

1
./kafka-topics.sh --create --zookeeper 10.10.15.236:2181 --replication-factor 1 --partitions 1 --topic test

5、测试生产消息

1
2
3
4
[root@localhost kafka]# bin/kafka-console-producer-admin.sh --broker-list 10.10.15.235:9092 --topic test --producer.config config/client-sasl.properties
>1
>2
>3

6、测试“gumx”用户

我们创建一个bin/kafka-console-producer-gumx.sh文件, 只是修改其中的kafka_client_jaas_admin.conf 为 kafka_client_jaas_gumx.conf

1
2
3
4
5
6
7
8
9
10
vim config/kafka_client_jaas_gumx.conf 
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="gumx"
password="123456";
};

cp bin/kafka-console-producer-admin.sh bin/kafka-console-producer-gumx.sh
vi bin/kafka-console-producer-gumx.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_gumx.conf kafka.tools.ConsoleProducer "$@"

生产消息

1
bin/kafka-console-producer-gumx.sh --broker-list 10.10.15.235:9092 --topic test --producer.config config/client-sasl.properties

可以看到报错了, 因为fanboshi用户还没有权限

kafka-console-consumer

1、创建 config/consumer-admin.properties 文件

1
2
3
4
vim config/consumer-admin.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
group.id=admin-test-group

2、创建 bin/kafka-console-consumer-admin.sh 文件

1
2
3
4
cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-admin.sh
vim bin/kafka-console-consumer-admin.sh
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_admin.conf kafka.tools.ConsoleConsumer "$@"

3、测试消费者

1
bin/kafka-console-consumer-admin.sh --bootstrap-server 10.10.15.235:9092 --topic test --consumer.config config/consumer-admin.properties --from-beginning

我们看到了从头开始消费,都消费下来了

4、测试“gumx”用户

我们创建一个config/consumer-gumx.properties文件和bin/kafka-console-consumer-gumx.sh文件

1
2
3
4
5
6
7
8
9
vim config/consumer-gumx.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
group.id=gumx-test-group

cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-gumx.sh
vim bin/kafka-console-consumer-gumx.sh
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_gumx.conf kafka.tools.ConsoleConsumer "$@"

5、测试“gumx”用户的消费者

1
bin/kafka-console-consumer-gumx.sh --bootstrap-server 10.10.15.235:9092 --topic test --consumer.config config/consumer-gumx.properties --from-beginning

我们可以看到没有权限。

2、kafka授权

Kafka授权机制

Kafka 的授权机制(Authorization)。所谓授权,一般是指对与信息安全或计算机安全相关的资源授予访问权限,特别是存取控制。

具体到权限模型,常见的有四种。

ACL:Access-Control List,访问控制列表。

RBAC:Role-Based Access Control,基于角色的权限控制。

ABAC:Attribute-Based Access Control,基于属性的权限控制。

PBAC:Policy-Based Access Control,基于策略的权限控制

ACL授权

当前,Kafka 提供了一个可插拔的授权实现机制。该机制会将你配置的所有 ACL 项保存在 ZooKeeper 下的 /kafka-acl 节点中。你可以通过 Kafka 自带的 kafka-acls 脚本动态地对 ACL 项进行增删改查,并让它立即生效。

1、如何开启 ACL

在 Kafka 中,开启 ACL 的方法特别简单,你只需要在 Broker 端的配置文件中增加一行设置即可,也就是在 server.properties 文件中配置下面这个参数值:

1
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

authorizer.class.name 参数指定了 ACL 授权机制的实现类。当前 Kafka 提供了 Authorizer 接口,允许你实现你自己的授权机制,但更常见的做法,还是直接使用 Kafka 自带的 SimpleAclAuthorizer 实现类。一旦设置好这个参数的值,并且启动 Broker 后,该 Broker 就默认开启了 ACL 授权验证。在实际生产环境中,你需要为集群中的每台 Broker 都做此设置。

2、超级用户设置

超级用户(Super User)在开启了 ACL 授权之后,你还必须显式地为不同用户设置访问某项资源的权限,否则,在默认情况下,没有配置任何 ACL 的资源是不能被访问的。不过,这里也有一个例外:超级用户能够访问所有的资源,即使你没有为它们设置任何 ACL 项。那么,我们如何在一个 Kafka 集群中设置超级用户呢?方法很简单,只需要在 Broker 端的配置文件 server.properties 中,设置 super.users 参数即可。

1
super.users=User:superuser1;User:superuser2

注意,如果你要一次性指定多个超级用户,那么分隔符是分号而不是逗号,这是为了避免出现用户名中包含逗号从而无法分割的问题。除了设置 super.users 参数,Kafka 还支持将所有用户都配置成超级用户的用法。如果我们在 server.properties 文件中设置 allow.everyone.if.no.acl.found=true,那么所有用户都可以访问没有设置任何 ACL 的资源。不过,我不太建议进行这样的设置。毕竟,在生产环境中,特别是在那些对安全有较高要求的环境中,采用白名单机制要比黑名单机制更加令人放心。

3、kafka-acls 脚本

在了解了 Kafka 的 ACL 概念之后,我们来看一下如何设置它们。当前在 Kafka 中,配置授权的方法是通过 kafka-acls 脚本。举个例子,如果我们要为用户 Alice 增加了集群级别的所有权限,那么我们可以使用下面这段命令。

1
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=10.10.15.235:2181 --add --allow-principal User:gumx --operation All --topic '*' --cluster

这段代码的含义就是给gumx用户所有主题的所有操作的是集群权限

另一个常见用法

1
bin/kafka-acls --authorizer-properties zookeeper.connect=10.10.15.235:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 10.205.96.119 --operation Read --topic test

User 后面的星号表示所有用户,allow-host 后面的星号则表示所有 IP 地址。这个命令的意思是,允许所有的用户使用任意的 IP 地址读取名为 test 的主题数据,同时也禁止 BadUser 用户和 10.205.96.119 的 IP 地址访问 test 下的消息

4、ACL 权限列表

Kafka 当前提供的授权机制是非常细粒度的。现在,我来跟你分享一下这个表格的使用方法。

举个例子,假如你要为你的生产者程序赋予写权限,那么首先,你要在 Resource 列找到 Topic 类型的权限,然后在 Operation 列寻找 WRITE 操作权限。这个 WRITE 权限是限制 Producer 程序能否向对应主题发送消息的关键。通常情况下,Producer 程序还可能有创建主题、获取主题数据的权限,所以 Kafka 为 Producer 需要的这些常见权限创建了快捷方式,即 –producer。也就是说,在执行 kafka-acls 命令时,直接指定 –producer 就能同时获得这三个权限了。 –consumer 也是类似的,指定 –consumer 可以同时获得 Consumer 端应用所需的权限。

5、授权

我们根据在认证中使用的用户“gumx”授予权限

授予gumx用户对test topic 写权限, 只允许 10.10.15.* 网段

1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.10.15.235:2181 --add --allow-principal User:gumx --operation Write --topic test --allow-host 10.10.15.*

授予gumx用户对test topic 读权限, 只允许 10.10.15.* 网段

1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.10.15.235:2181 --add --allow-principal User:gumx --operation Read --topic test --allow-host 10.10.15.*

授予gumx用户, gumx-test-group 消费者组 对test topic 读权限, 只允许 10.10.15.* 网段

1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.10.15.235:2181 --add --allow-principal User:gumx --operation Read --group gumx-test-group --allow-host 10.10.15.*

查看acl配置

1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.10.15.235:2181 --list

删除配置

1
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=10.10.15.235:2181 --remove --allow-principal User:gumx --operation Read --topic test --allow-host 10.10.15.*

6、测试

生产者

1
bin/kafka-console-producer-gumx.sh --broker-list 10.10.15.235:9092 --topic test --producer.config config/client-sasl.properties

消费者

1
bin/kafka-console-consumer-gumx.sh --bootstrap-server 10.10.15.235:9092 --topic test --consumer.config config/consumer-gumx.properties --from-beginning

3、代码实现

我们先看看gumx这个用户所有什么权限

生产者producer

根据上面配置的“gumx”用户我们使用代码实现,生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ACLProducerTest {

static {
System.setProperty("java.security.auth.login.config", "D://JCZ//workspace_zxgk_master//kafka_test//src//main//resources//producer_jaas.conf");
}

public static void main(String[] args) {
final Properties props = new Properties();
props.put("bootstrap.servers","10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test", "hello");
Future<RecordMetadata> future = producer.send(record);
try {
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata.offset()+"=="+recordMetadata.topic());
} catch (Exception e) {
e.printStackTrace();
}
producer.flush();
producer.close();
}
}

引用的配置文件

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="gumx"
password="123456";
};

如果我们改变主题为“gumx”,因为这个主题我们没有授权就会报错

消费者consumer

注意使用的消费组需要进行授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ACLConsumerTest {

static {
System.setProperty("java.security.auth.login.config", "D://JCZ//workspace_zxgk_master//kafka_test//src//main//resources//consumer_jaas.conf");
}

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.15.235:9092;10.10.15.236:9092;10.10.15.237:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gumx-test-group");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("test"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(2000));
records.forEach(record ->{
System.out.println(record.key()+"========="+record.value()+"========"+record.offset()+"========"+record.partition());
});
}
}
}

引用的配置文件

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="gumx"
password="123456";
};

更换消费组为data_record,就没有权限

本文作者: 顾 明 训
本文链接: https://www.itzones.cn/2020/04/03/Kafka认证授权/
版权声明: 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!
  • kafka SASL/SCRAM
  • kafka ACL
  • kafka

扫一扫,分享到微信

微信分享二维码
Redis哨兵模式安装部署
Kafka无消息丢失配置
  1. 1. 1、kafka认证
    1. 1.1. Kafka 认证机制
    2. 1.2. 认证机制的比较
    3. 1.3. SASL/SCRAM认证
      1. 1.3.1. 1、创建用户
      2. 1.3.2. 2、配置Kafka Brokers
      3. 1.3.3. 3、重启 ZK/Broker
      4. 1.3.4. 4、客户端配置
        1. 1.3.4.1. kafka-console-producer
        2. 1.3.4.2. kafka-console-consumer
  2. 2. 2、kafka授权
    1. 2.1. Kafka授权机制
    2. 2.2. ACL授权
      1. 2.2.1. 1、如何开启 ACL
      2. 2.2.2. 2、超级用户设置
      3. 2.2.3. 3、kafka-acls 脚本
      4. 2.2.4. 4、ACL 权限列表
      5. 2.2.5. 5、授权
      6. 2.2.6. 6、测试
  3. 3. 3、代码实现
    1. 3.1. 生产者producer
    2. 3.2. 消费者consumer
© 2020 IT小栈
载入天数...载入时分秒... || 本站总访问量次 || 本站访客数人次
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链

tag:

  • jvm
  • Java基础
  • kafka HW
  • kafka Leader Epoch
  • kafka
  • kafka位移主题
  • kafka位移提交
  • kafka副本机制
  • kafka ISR
  • zookeeper
  • kafka消息丢失
  • kafka日志存储
  • kafka Log Clean
  • kafka Log Compaction
  • kafka消费位移设置
  • kafka Rebalance
  • kafka分区算法
  • kafka生产者拦截器
  • kafka SASL/SCRAM
  • kafka ACL
  • redis
  • redis Ziplist
  • redis Hashtable
  • redis LinkedList
  • redis QuickList
  • redis intset
  • redis String
  • redis SDS
  • redis SkipList
  • redisDb
  • redisServer
  • redis 简介
  • Redis Cluster
  • 主从同步
  • RocketMQ高可用HA
  • 事务消息
  • 内存映射
  • MMAP
  • 同步刷盘
  • 异步刷盘
  • 消息存储文件
  • RocketMQ安装
  • 延迟消息
  • RocketMQ入门
  • 推拉模式
  • PushConsumer
  • 消费结果处理
  • rebalance
  • RocketMQ权限控制
  • RocketMQ ACL
  • 消息过滤
  • 消息重试
  • 消费位置
  • 集群消费
  • 广播消费
  • 运维命令
  • shiro源码分析
  • shiro入门
  • IOC和DI
  • Spring创建Bean
  • Bean生命周期
  • Sping属性注入
  • 异常
  • SpringMVC
  • springCloud
  • Eureka

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 我的OSCHINA
  • 我的CSDN
  • 我的GITHUB
  • 一生太水