摘要
简单记录使用Kafka的KRaft模式来配置PLAIN/SASL_PLAINTEXT的认证模式。
正文
需要有KRaft相关的基础,才行。可参阅之前学习记录Kafka
一、PLAIN/SASL_PLAINTEXT认证 Kafka 使用 Java Authentication and Authorization Service (JAAS ) 来配置SASL。
首先需要了解SASL的含义,SASL全称为Simple Authentication and Security Layer,它主要是用于在客户端和服务器之间提供安全的身份验证机制。
Kafka支持不同的通信协议,如下
安全协议 数据加密 认证机制 适用场景 PLAINTEXT —— —— 内网环境、安全性有保障 SSL/TLS SSL/TLS —— 在公网传输敏感数据 SASL_PLAINTEXT —— SASL 需要身份验证但不需要数据加密的环境 SASL_SSL SSL/TLS SASL 安全性要求极高的环境
Kafka 的 SASL 认证机制如下
其中,PLAIN相对来说更简单,本文就记录PLAIN/SASL_PLAINTEXT的配置与使用。
1.1 单机配置 安装的过程参考一键安装的脚本 ,安装好Kafka后,按照如下操作进行配置
1.) 在config/kraft目录下创建kafka_server_jaas.conf
1
2
3
4
5
6
7
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
user_admin="admin-secret"表示用户名admin,对应的密码为admin-secret。user_alice同理。
username和password表示节点建立集群时,需要验证的身份信息,只有验证通过的节点,方能成功建立集群。
2.)进入到kafka的根路径下, 复制一份kafka-server-start.sh出来。
1
cp ./bin/kafka-server-start.sh ./bin/kafka-server-start-jaas.sh
并修改复制出来的文件,添加三行。
1
2
3
4
# 若不存在KAFKA_OPTS环境变量,则设置一个
if [ "x $KAFKA_OPTS " = "x" ] ; then
export KAFKA_OPTS = "-Djava.security.auth.login.config= $base_dir /../config/kraft/kafka_server_jaas.conf"
fi
此处如果想要自定义日志配置文件的话,可以添加环境变量KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/config/log4j.properties",也可以直接在环境变量KAFKA_OPTS追加-Dlog4j.configuration
展开
3.) 将config/kraft/server.properties再复制一份。
1
cp ./config/kraft/server.properties ./config/kraft/server.jaas.properties
并修改复制出来的文件,修改参数如下。其中advertised.listeners中的ip要改为实际使用的ip地址。
1
2
3
4
5
6
7
8
9
10
11
# 控制客户端与服务端通信时的认证机制,默认为GSSAPI
sasl.enabled.mechanisms = PLAIN
# 控制Kafka broker内部服务通信时的认证机制,默认为GSSAPI
sasl.mechanism.inter.broker.protocol = PLAIN
# 控制Kafka controller内部通信时的认证机制,默认为GSSAPI
sasl.mechanism.controller.protocol = PLAIN
listeners = BROKER://:9092,CONTROLLER://:9093
inter.broker.listener.name = BROKER
advertised.listeners = BROKER://10.0.0.10:9092
listener.security.protocol.map = BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
其中,listener.security.protocol.map的格式为{监听器名称}:{使用的安全协议}。
4.) 启动
进入kafka根目录,格式化集群信息,并启动
1
2
./bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c ./config/kraft/server.jaas.properties
./bin/kafka-server-start-jaas.sh ./config/kraft/server.jaas.properties
展开
1.2 集群配置 准备三台机器
10.0.0.101 10.0.0.102 10.0.0.102 按照单机Kafka的配置步骤进行配置,但是在第三步时,略有调整。
10.0.0.101的示例配置文件如下。每个节点中,需要单独修改三个参数,分别为node.id、controller.quorum.voters、advertised.listeners。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
process.roles = broker,controller
node.id = 1
controller.quorum.voters = 1@10.0.0.101:9093,2@10.0.0.102:9093,3@10.0.0.103:9093
listeners = BROKER://:9092,CONTROLLER://:9093
# 控制客户端与服务端通信时的认证机制,默认为GSSAPI
sasl.enabled.mechanisms = PLAIN
# 控制Kafka broker内部服务通信时的认证机制,默认为GSSAPI
sasl.mechanism.inter.broker.protocol = PLAIN
# 控制Kafka controller内部通信时的认证机制,默认为GSSAPI
sasl.mechanism.controller.protocol = PLAIN
inter.broker.listener.name = BROKER
advertised.listeners = BROKER://10.0.0.101:9092
controller.listener.names = CONTROLLER
listener.security.protocol.map = BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
其中node.id表示节点的编号,controller.quorum.voters表示集群中的所有具有controller角色的节点,其配置格式为{node.id}@{ip}:{port}。
在三节点上均执行如下命令
1
2
./bin/kafka-storage.sh format -t T1CYXg2DQPmdSYSUI-FNFw -c ./config/kraft/server.jaas.properties
./bin/kafka-server-start-jaas.sh ./config/kraft/server.jaas.properties
启动后日志如图。
展开
1.3 客户端接入认证 1.) 访问无授权的Kafka服务
1
./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups
2.) 访问开启授权的Kafka服务,则需要创建存储身份认证的文件kafka.properties
1
2
3
security.protocol = SASL_PLAINTEXT
sasl.mechanism = PLAIN
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
通过代码接入时,也是在properties里面添加该参数即可。
之后再执行命令
1
./kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --all-groups --command-config ./kafka.properties
1.4 集群为何是奇数 常见的分布式一致性算法
Paxos Raft KRaft全称为Kafka Raft,是基于Raft算法实现的分布式副本管理协议。
Raft的多数派原则 ,规定了集群的建立无需全员存活,只要存活多数(n个节点,存活节点数>n/2)即可视为集群为正常状态。
这个规定解决了集群中出现的脑裂问题。
比如4台节点组成的集群,其中2台为上海机房,另外2台为北京机房,当上海与北京的网络出现故障,那么一个集群就会分裂为2个集群,这就是脑裂现象。
由于这个规定,导致集群中n台和n+1台节点,他们的容灾能力是一样的(n为奇数),都只能坏一台。使用奇数个节点反而能节省资源。
1.5 开启授权之后连接变慢 Kafka开启了PLAIN/SASL_PLAINTEXT授权之后,通过Kafka的IP连入,KafkaConsumer认证时变得巨慢,将Consumer日志等级设置成TRACE,观察日志输出。发现中间有段耗时特别长。
展开
直接断点跟入org.apache.kafka.clients.NetworkClient#initiateConnect
展开
根源是org.apache.kafka.common.network.SaslChannelBuilder#buildChannel内部为了记录日志,通过InetAddress的getHostName()获取主机名,导致经过了DNS服务,内网DNS服务又存在问题,因此出现慢的情况。
解决办法是在内网机器上直接配置对应hosts即可 。
参考
二、ACL授权 ACL (Access Control List)用于控制“谁能访问哪些资源”,细化到 topic、consumer group、cluster 等资源类型。
ACL 授权既可以在开启认证之后使用,也可以在明文访问时使用 。如果在明文访问时使用,客户端连入的用户名为ANONYMOUS
2.1 配置 下面使用PLAIN/SASL_PLAINTEXT进行认证、ACL进行授权。
将上面的config/kraft/server.jaas.properties复制一份,命名为server.acl.properties,根据上文PLAIN/SASL_PLAINTEXT的单机配置,添加或者修改如下内容
1
2
3
4
5
6
7
8
9
# 使用Kraft 默认授权器 StandardAuthorizer
authorizer.class.name = org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 配置超级管理员为admin和Admin
super.users = User:admin;User:Admin
# true表示如果对应请求的ACL规则,则允许所有用户访问该资源。这相当于默认开放访问权限
allow.everyone.if.no.acl.found = false
# 将controller修改为SASL_PLAINTEXT
listener.security.protocol.map = BROKER:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
切记一定要将CONTROLLER对应的协议类型修改为SASL_PLAINTEXT,否则broker与controller之间通信会使用用户ANONYMOUSE,都开启了SASL_PLAINTEXT之后,才会使用配置文件kafka_server_jaas.conf里面配置的username和password
2.2 管理命令 命令格式
1
kafka-acls.sh [ 通用参数] [ 操作类型] [ 资源类型参数] [ 权限配置参数]
通用参数(连接 Kafka)
参数 说明 示例 --bootstrap-server指定 Kafka 地址 --bootstrap-server localhost:9092--command-config客户端配置文件 --command-config /etc/kafka/client.properties
操作类型参数(操作 ACL)
参数 说明 示例 --add添加 ACL --add--remove删除 ACL --remove--list列出现有 ACL --list
注意:三选一,每次只能执行一种操作。
资源类型参数(指定作用对象)
参数 说明 示例 --topic <name>指定 Topic 名称 --topic my-topic--group <id>指定消费组 ID --group my-group--cluster指向 Kafka 集群本身 --cluster--transactional-id <id>指定事务 ID --transactional-id my-tx
权限配置参数(授权规则)
参数 说明 示例 --allow-principal <User:user>允许指定用户 --allow-principal User:alice--deny-principal <User:user>拒绝指定用户 --deny-principal User:bob--allow-host <host>允许访问的主机(默认 *) --allow-host 192.168.0.1--deny-host <host>拒绝访问的主机 --deny-host 10.0.0.5--operation <op>指定操作类型。Read、Write、Create、Delete、Alter、Describe、ClusterAction、DescribeConfigs、AlterConfigs、IdempotentWrite、CreateTokens、DescribeTokens、All --operation Read
示例
创建客户端配置文件config/kraft/client.properties
1
2
3
security.protocol = SASL_PLAINTEXT
sasl.mechanism = PLAIN
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
执行命令添加授权
1
2
bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal User:alice --allow-principal User:A
lice --operation Read --operation Write --topic meethigher --command-config config/kraft/client.properties
展开
三、监控 3.1 监控组件比较 常见的Kafka监控如下
CMAK (previously known as Kafka Manager) kafdrop 优点:a. 轻量,开箱即用 b. 支持KRaft 缺点:a. 实时性监控并不好 b. 交互做得并不好,当节点开启身份验证时,会存在严重卡顿情况 3.2 kafdrop使用 首先下载Release 4.0.1 · obsidiandynamics/kafdrop
执行命令,启动监控
1
java - jar kafdrop - 4 . 0 . 1 . jar -- kafka . brokerConnect = 10 . 0 . 0 . 101 : 9092 , 10 . 0 . 0 . 102 : 9092 , 10 . 0 . 0 . 103 : 9092
kafdrop本身是一个springboot项目,对于javaer来说是很有好的。其他一些复杂的配置,可以直接下载源码查看application.yml
1
2
3
4
5
6
7
8
# 省略以上若干配置。详细内容可自行查看
kafka :
brokerConnect : localhost:9092
saslMechanism : "PLAIN"
securityProtocol : "SASL_PLAINTEXT"
truststoreFile : "${KAFKA_TRUSTSTORE_FILE:kafka.truststore.jks}"
propertiesFile : "${KAFKA_PROPERTIES_FILE:kafka.properties}"
keystoreFile : "${KAFKA_KEYSTORE_FILE:kafka.keystore.jks}"
"${KAFKA_PROPERTIES_FILE:kafka.properties}"
表示获取环境变量KAFKA_PROPERTIES_FILE,如果存在则使用环境变量值,否则使用默认值kafka.properties
比如我开启了授权,那么需要再kafdrop的同级目录下创建kafka.properties,然后再次启动。
1
2
3
security.protocol : SASL_PLAINTEXT
sasl.mechanism : PLAIN
sasl.jaas.config : org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
最终kafdrop监控效果如图。
展开
四、使用 源码参考kafka-cluster-demo/spring-kafka-demo at master · meethigher/kafka-cluster-demo
4.1 KafkaClient 添加依赖
1
2
3
4
5
<dependency>
<groupId> org.apache.kafka</groupId>
<artifactId> kafka-clients</artifactId>
<version> 3.3.1</version>
</dependency>
4.1.1 生产者(轮询策略bug) 生产者示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.kafka.clients.producer.* ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.util.Properties ;
public class Producer {
private static Logger log = LoggerFactory . getLogger ( Producer . class );
public static void main ( String [] args ) throws Exception {
// 配置生产者属性
Properties props = new Properties ();
props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092" );
props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer" );
props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringSerializer" );
// 配置 SASL_PLAINTEXT 认证
props . put ( "security.protocol" , "SASL_PLAINTEXT" );
props . put ( "sasl.mechanism" , "PLAIN" );
props . put ( "sasl.jaas.config" , "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";" );
// 创建生产者实例
KafkaProducer < String , String > producer = new KafkaProducer <> ( props );
// 发送消息到指定主题
String topic = "meethigher" ;
String key = "timestamp" ;
while ( true ) {
// 发送消息
String value = String . valueOf ( System . currentTimeMillis ());
ProducerRecord < String , String > record = new ProducerRecord <> ( topic , key , value );
producer . send ( record , ( metadata , exception ) -> log . info ( "sent key={},value={} to topic={},partition={}" , record . key (), record . value (), metadata . topic (), metadata . partition ()));
System . in . read ();
}
// 关闭生产者
// producer.close();
}
}
生产者的分区轮询策略有BUG,会导致发送一条消息时,触发两次计算分区。因此如果想要实现在分区间均衡发送消息,则需要重写官方提供的轮询策略。具体内容在我的源码里有记录。参考
4.1.2 消费者 消费者示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.clients.consumer.ConsumerRecords ;
import org.apache.kafka.clients.consumer.KafkaConsumer ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.time.Duration ;
import java.util.Collections ;
import java.util.Properties ;
public class Consumer {
private static final Logger log = LoggerFactory . getLogger ( Consumer . class );
public static void main ( String [] args ) {
// 配置消费者属性
Properties props = new Properties ();
props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , "10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092" );
props . put ( ConsumerConfig . GROUP_ID_CONFIG , "your_consumer_group" );
props . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringDeserializer" );
props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , "org.apache.kafka.common.serialization.StringDeserializer" );
// 配置 SASL_PLAINTEXT 认证
props . put ( "security.protocol" , "SASL_PLAINTEXT" );
props . put ( "sasl.mechanism" , "PLAIN" );
props . put ( "sasl.jaas.config" , "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";" );
// 创建消费者实例
KafkaConsumer < String , String > consumer = new KafkaConsumer <> ( props );
// 订阅主题
String topic = "meethigher" ;
consumer . subscribe ( Collections . singletonList ( topic ));
// 消费消息
while ( true ) {
ConsumerRecords < String , String > records = consumer . poll ( Duration . ofMillis ( 100 ));
records . forEach ( record -> {
log . info ( "Consumed record with key {} and value {}" , record . key (), record . value ());
});
}
}
}
4.2 SpringKafka 4.2.1 使用 创建springboot项目,添加依赖
1
2
3
4
<dependency>
<groupId> org.springframework.kafka</groupId>
<artifactId> spring-kafka</artifactId>
</dependency>
生产者application.yml配置
1
2
3
4
5
6
7
8
9
10
11
12
spring :
kafka :
# bootstrap-servers: 10.0.0.101:9092
bootstrap-servers : 10.0.0.101 : 9092 , 10.0.0.102 : 9092 , 10.0.0.103 : 9092
# 接入授权
properties :
security.protocol : SASL_PLAINTEXT
sasl.mechanism : PLAIN
sasl.jaas.config : org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
producer :
key-serializer : org.apache.kafka.common.serialization.StringSerializer
value-serializer : org.apache.kafka.common.serialization.StringSerializer
消费者application.yml配置
1
2
3
4
5
6
7
8
9
10
11
12
13
spring :
kafka :
# bootstrap-servers: 10.0.0.10:9092
bootstrap-servers : 10.0.0.101 : 9092 , 10.0.0.102 : 9092 , 10.0.0.103 : 9092
# 接入授权
properties :
security.protocol : SASL_PLAINTEXT
sasl.mechanism : PLAIN
sasl.jaas.config : org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";
consumer :
key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
group-id : group # 默认的分组名,消费者都会有所属组
4.2.2 重试机制 关于SpringKafka的重试机制,未进行深入探索,只参考了文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎 。
感觉这篇文章讲得很透彻了。按照该文章的说明,验证了以下两种重试策略
默认重试策略 自定义重试策略 默认重试策略 快速重试10次,无间隔时间,如果最后还是失败,则自动commit。
1
2
3
4
@KafkaListener ( topics = "test-retry" )
public void test ( ConsumerRecord <? , ?> record , Consumer <? , ?> consumer ) {
int i = 1 / 0 ;
}
不需要额外配置,这是默认的重试策略。
自定义重试策略 Spring单独提供了RetryableTopic注解,及重试后的回调注解DltHandler。底层逻辑是新建了topic对这些失败的数据进行存储,以及监听这些新建的topic再进行消费,细节的话还是参考文章Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎 。
1
2
3
4
5
6
7
8
9
10
11
12
@KafkaListener ( topics = "test-retry" )
@org.springframework.kafka.annotation.RetryableTopic ( attempts = "5" , backoff = @org.springframework.retry.annotation.Backoff ( delay = 5000 , maxDelay = 10000 , multiplier = 2 ))
public void test ( ConsumerRecord <? , ?> record , Consumer <? , ?> consumer ) {
int i = 1 / 0 ;
}
@DltHandler
public void listenDlt ( String in , @Header ( KafkaHeaders . RECEIVED_TOPIC ) String topic ,
@Header ( KafkaHeaders . OFFSET ) long offset ) {
log . error ( "DLT Received: {} from {} @ {}" , in , topic , offset );
}
更多的案例可以参考spring-kafka/samples at main · spring-projects/spring-kafka
五、参考致谢 Apache Kafka
spring-projects/spring-kafka: Provides Familiar Spring Abstractions for Apache Kafka
Listeners in KAFKA
kafka+Kraft模式集群+安全认证_kafka认证机制_鸢尾の的博客-CSDN博客
我的 Kafka 旅程 - SASL+ACL 认证授权 · 配置 · 创建账号 · 用户授权 · 应用接入 - Sol·wang - 博客园
从k8s集群主节点数量为什么是奇数来聊聊分布式系统 - 知乎
万字长文解析raft算法原理 - 知乎
Spring Kafka:Retry Topic、DLT 的使用与原理 - 知乎