言成言成啊 | Kit Chen's Blog

Kafka

发布于2021-05-03 17:35:02,更新于2023-11-17 20:10:14,标签:java kafka  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

老规矩,学东西先学文档

一、Kafka概述

1.1 定义

kafka是一个分布式的基于发布/订阅模式消息队列Message Queue,主要应用于大数据实时处理领域。

1.2 消息队列

MQ的传统应用场景的异步处理

使用消息队列的好处

  1. 解耦:允许你独立的扩展或者修改两边的处理过程,只要确保它们遵守同样的接口约束
  2. 可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可在系统恢复后被处理
  3. 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息处理速度不一致的情况。
  4. 灵活性&峰值处理能力:突发流量平时不常见,如果用硬件来解决,会浪费大量资源。使用消息队列能使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而完全崩溃。
  5. 异步通信:消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理。想向队列中放入多少消息就放多少,然后在需要的时候再处理它。

消息队列模式

  1. 点对点模式(一对一,消息消费后,消息被清除,不可重复消费)
    • 生产者生产消息发送到queue中,然后消费者从queue中取出并且消费信息。
    • 消息被消费以后,queue中不再存储,所以消费者不可能消费到已经被消费的消息
    • queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
  2. 发布订阅模式(一对多,消费者消费数据后,不会清除消息,可以重复消费)
    • 生产者生产消息到topic中,同时有多个消费者消费该消息。类似于一个发布/订阅,比如微信公众号的推送

发布订阅模式有两种类型

  1. 消费者主动拉取数据
    • 缺点:消费者端需要维护一个长轮询,一直向生产者请求是否有最新数据。
  2. 消息队列推送数据
    • 缺点:资源浪费

1.3 kafka架构

整个架构可以参照下面这张图

仔细看下面这个图,工作中我对kafka集群不了解,看博客也没仔细看,其实仔细看看就会了。就看下面这张图!

让所有的Kafka是一个集群,只需要使用同一套ZK即可,即可以是一个zk,也可以是zk集群,不过都需要在kafka配置文件里面配置。

Producer :消息生产者,就是向 kafka broker 发消息的客户端;

Consumer :消息消费者,向 kafka broker 取消息的客户端;

Consumer Group(CG):消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个topic。

Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;

Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个partition 是一个有序的队列;

Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。

leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

二、Kafka快速入门

2.1 使用ZK常规安装

centos8安装kafka

首先进入官网下载

我使用的是kafka_2.12-2.8.0,2.12指的是Scala的版本,2.8.0才是kafka的版本。

安装kafka

进入服务器的/opt/software目录下,执行命令进行下载

1
wget https://downloads.apache.org/kafka/2.8.0/kafka_2.12-2.8.0.tgz

如果wget不存在的话,那么就安装wget,-y表示yes or no时,选择yes

1
sudo yum -y install wget

解压到/opt/module下面

1
tar -zxvf kafka_2.12-2.8.0.tgz -C /opt/module

进入module下面,重命名文件夹

1
mv kafka_2.12-2.8.0.tgz kafka-2.12-2.8.0

将该kafka的文件夹分发到该集群下的别的机器上,scp详细教程参考

1
scp -r kafka-2.12-2.8.0 root@hadoop102:/opt/module

修改kafka配置文件server.properties中的brokerId

1
2
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

配置kafka环境变量,参考配置环境变量的三种方式

1
2
3
4
5
6
7
cd /etc/profile.d #第一步
vim kafka_env.sh #第二步
#第三步 下面是添加到kafka_env.sh里面的内容
KAFKA_HOME=/opt/module/kafka-2.12-2.8.0
PATH=$PATH:$KAFKA_HOME
export KAFKA_HOME
source /etc/profile #第四步 刷新环境变量

安装zookeeper

下载安装Zookeeper,注意要与kafka中的libs下的zookeeper的jar包版本一致。

1
wget https://downloads.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

解压操作类似于安装kafka、配置环境变量也一样。

进入zk的conf下,复制一份配置文件

1
cp zoo_sample.cfg zoo.cfg

启动zk

1
zkServer.sh start

配置Kafka集群

配置文件详解

配置kafka集群,最关键的还是broker.id、zookeeper.connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# 节点的id,每个节点的id必须是唯一的
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.10.102:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

2.2 使用Kraft

配置单节点

帅呆了!Kafka移除了Zookeeper! - 知乎

启动kafka2.8报afka.common.KafkaException: No meta.properties found in /tmp/kraft-combined-logs_wobenqinren的博客-CSDN博客

kafka官网没有对不使用zk的安装做说明,明明支持kraft,但还是用的zk。不知道是因为不稳定不推荐还是维护文档的人给忘记更新了。

进入kafka根目录

1
2
3
./bin/kafka-storage.sh random-uuid
./bin/kafka-storage.sh format -t TBYU7WMiREexuZqrjKG60g -c ./config/kraft/server.properties
./bin/kafka-server-start.sh ./config/kraft/server.properties

注意

这个uuid可以不用生成,直接自己随便填一个也可以。这个uuid主要用来做kafka集群(或单节点)的cluster_id

这样能启动成功了,但是还是不能远程访问,修改kraft的配置文件,将localhost都改为你的ip地址。

再次启动即可。

简单写一个后台启动kafka的shell脚本

1
2
3
4
rand=`./bin/kafka-storage.sh random-uuid`
echo $rand
./bin/kafka-storage.sh format -t $rand -c ./config/kraft/server.properties
nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties >/dev/null 2>&1 &

配置kafka集群

Kafka3.0集群搭建(Kraft模式)-尝鲜 - 知乎

Kafka高可用 — KRaft集群搭建_WonderThink的博客-CSDN博客_kafka kraft集群

推荐一套三个节点下的kraft集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 此配置文件用于KRaft模式,其中Apache ZooKeeper不存在。
# 此配置适用于Scala 2.13 - kafka_2.13-3.3.1
# https://meethigher.top

############################# Server配置 #############################

# 该服务的角色(对应kraft中角色),controller表示拥有选举功能,对应下方voters
process.roles=broker,controller

# 节点编号(表示broker与controller的节点编号)
node.id=10

# 进行选举领导人(controller)的节点
# 单节点
# controller.quorum.voters=10@10.0.0.10:9093
# 集群
controller.quorum.voters=10@10.0.0.10:9093,11@10.0.0.11:9093,12@10.0.0.12:9093

############################# Socket配置 #############################

# 监听地址
# 组合的节点 (比如 `process.roles=broker,controller`) 至少要列出 controller listener
# 如果 broker listener 未指定, 默认的host_name是 java.net.InetAddress.getCanonicalHostName(), listener_name是 PLAINTEXT, port是 9092
# 格式如下:
# listeners = listener_name://host_name:port
# 示例:
# listeners = PLAINTEXT://10.0.0.10:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# broker之间的监听器名称
inter.broker.listener.name=PLAINTEXT

# 客户端监听的地址
# 如果没有配置,他会使用 "listeners".
# 如果不配置ip地址,消费端拿到的是主机名,会有问题
advertised.listeners=PLAINTEXT://10.0.0.10:9092

# 控制器使用的监听器名称(以逗号分隔)
# 如果没有在 `listener.security.protocol.map` 配置, 默认使用 PLAINTEXT 协议
# 在KRaft模式下运行,这是必需的。
controller.listener.names=CONTROLLER

# 将监听器名称映射到安全协议,默认情况下是相同的。有关详细信息,请参阅配置文档
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 接收请求、返回响应时的线程数量
num.network.threads=3

# 服务器用于处理请求的线程数,其中可能包括磁盘IO
num.io.threads=8

# socket发送数据缓冲区大小 (SO_SNDBUF)
socket.send.buffer.bytes=102400

# socket接收数据缓冲区大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400

# socket接收数据的最大大小 (防止内存溢出)
socket.request.max.bytes=104857600


############################# Log(以下日志是指消息数据,非服务器日志)配置 #############################

# 以逗号分隔的目录列表,其中存储日志文件(消息数据日志,并非服务器日志。服务器日志在kafka服务根目录的logs下)
log.dirs=/kraft/kraft-combined-logs

# 每个主题的默认日志分区数。
# 如果是多消费者的情况下,建议修改成与实际消费者数量一致。这样就能保证每个消费者在拉取时,拉取的分区各不相同,实现负载均衡
num.partitions=3

# 启动时用于日志恢复、关闭时用于刷新每个数据目录的线程数。
# 如果数据在独立磁盘组成的磁盘组中(RAID array), 该线程数要增加
num.recovery.threads.per.data.dir=1

############################# InternalTopic配置 #############################
# 组元数据内部主题的复制因子(备份) "__consumer_offsets" and "__transaction_state"
# 集群环境下,备份配置与节点数量保持一致。单节点时,备份为 1 即可
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
default.replication.factor=3

############################# 日志刷新策略 #############################


# 消息会立即写入文件系统,但默认情况下我们只使用fsync()来惰性地使用操作系统缓存。以下配置控制将数据刷新到磁盘。
# 这里有一些重要的权衡策略:
# 1. Durability: 如果不使用复制,未刷新的数据可能会丢失。
# 2. Latency: 非常大的刷新间隔可能会导致延迟峰值,当确实发生刷新时,将有大量数据要刷新。
# 3. Throughput: 刷新通常是最昂贵的操作,刷新间隔小可能导致过多的请求。
# 下面的设置允许将刷新策略配置为在一段时间后或每N条消息(或两者同时)刷新数据。这可以在全局范围内完成,也可以在每个主题的基础上重写。


# 强制将数据刷新到磁盘之前要接受的消息数
#log.flush.interval.messages=10000

# 在强制刷新之前,消息在日志中可驻留的最大时间
#log.flush.interval.ms=1000

############################# 日志保留策略 #############################

# 以下配置控制日志分块的处理。该策略可以设置为在一段时间后删除分块,或在给定大小累积后删除分块。只要满足其中一个条件,分块就会被删除。删除总是发生在日志的末尾。

# 在删除日志文件之前保留日志文件的小时数(以小时为单位), 默认一周
log.retention.hours=168

# 日志删除前的最大大小, 默认 1GB
#log.retention.bytes=1073741824

# 单个日志文件的最大大小, 默认 1GB
log.segment.bytes=1073741824

# 日志清理器进行清理的时间间隔,默认 5min
log.retention.check.interval.ms=300000

2.3 基础维护命令

基础维护命令|官网

查看topic分区情况

1
2
3
4
# 查看所有topic
./kafka-topics.sh --bootstrap-server 10.0.0.11:9092 --describe
# 查看指定topic
./kafka-topics.sh --bootstrap-server 10.0.0.11:9092 --describe --topic interface_monitoring_dynamic_allocation

查看所有组的消费情况

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --describe --all-groups

查看某个消费者组中所有消费者的消费情况,WORKING_AREA是组名

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --describe --group WORKING_AREA

查看该组所有的消费者

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --describe --group WORKING_AREA --members

查看该组所有消费者及消费时分配的分区

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --describe --group WORKING_AREA --members --verbose

手动删除一个或者多个消费者组

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --delete --group 组1 --group 组2

重置消费者从最新位置开始消费

1
./kafka-consumer-groups.sh --bootstrap-server 192.168.110.67:9092 --reset-offsets --group WORKING_AREA --topic topic1 --to-latest

kafka清空数据时,需要查看启动命令使用的配置文件。

比如

1
nohup ./bin/kafka-server-start.sh ./config/kraft/server.properties >/dev/null 2>&1 &

清空server.properties里面的log.dirs文件夹即可

1
2
3
4
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs
发布:2021-05-03 17:35:02
修改:2023-11-17 20:10:14
链接:https://meethigher.top/blog/2021/kafka/
标签:java kafka 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏