简介

消息队列简介

什么是消息队列

消息队列,英文名:Message Queue,经常缩写为MQ。从字面上来理解,消息队列是一种用来存储消息的队列。来看一下下面的代码:

1
2
3
4
5
6
7
8
9
// 1. 创建一个保存字符串的队列
Queue<String> stringQueue = new LinkedList<String>();

// 2. 往消息队列中放入消息
stringQueue.offer("hello");

// 3. 从消息队列中取出消息并打印
System.out.println(stringQueue.poll());

上述代码,创建了一个队列,先往队列中添加了一个消息,然后又从队列中取出了一个消息。这说明了队列是可以用来存取消息的。我们可以简单理解消息队列就是将需要传输的数据存放在队列中

消息队列中间件

消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。

目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

为什么叫Kafka呢

Kafka的架构师jay kreps非常喜欢franz kafka(弗兰兹·卡夫卡),并且觉得kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称kafka,该名字并没有特别的含义。

「也就是说,你特别喜欢尼古拉斯赵四,将来你做一个项目,也可以把项目的名字取名为:尼古拉斯赵四,然后这个项目就火了」

消息队列的应用场景

异步处理

电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

image-20230220204957771

系统解耦

image-20230220205100592

流量削峰

image-20230220205155405

日志处理(大数据领域常见)

大型电商网站(淘宝、京东、国美、苏宁…)、App(抖音、美团、滴滴等)等需要分析用户行为,要根据用户的访问行为来发现用户的喜好以及活跃情况,需要在页面上收集大量的用户访问信息。

image-20230220205338436

生产者、消费者模型

我们之前学习过Java的服务器开发,Java服务器端开发的交互模型是这样的:

image-20230220205417595

我们之前也学习过使用Java JDBC来访问操作MySQL数据库,它的交互模型是这样的:

image-20230220205444286

它也是一种请求响应模型,只不过它不再是基于http协议,而是基于MySQL数据库的通信协议。

而如果我们基于消息队列来编程,此时的交互模式成为:生产者、消费者模型。

image-20230220205531093

消息队列的两种模式

点对点模式

image-20230220205611571

消息发送者生产消息发送到消息队列中,然后消息接收者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。

点对点模式特点:

  • 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
  • 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
发布订阅模式

image-20230220205821230

发布/订阅模式特点:

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

Kafka简介

什么是Kafka

image-20230220205954332

Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写。Kafka的Apache官网是这样介绍Kakfa的。

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:

  1. 发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
  2. 以容错的持久化方式存储数据流
  3. 处理数据流

Kafka的应用场景

我们通常将Apache Kafka用在两类程序:

  1. 建立实时数据管道,以可靠地在系统或应用程序之间获取数据

  2. 构建实时流应用程序,以转换或响应数据流

image-20230220210211153

上图,我们可以看到:

  1. Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。

  2. Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。

  3. Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到

数据库中。

  1. Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。

Kafka诞生背景

kafka的诞生,是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那时的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师jay kreps便开始组织团队进行消息传递系统的研发。

提示:

  1. Linkedin还是挺牛逼的
  2. Kafka比ActiveMQ牛逼得多

Kafka的优势

前面我们了解到,消息队列中间件有很多,为什么我们要选择Kafka?

特性 ActiveMQ RabbitMQ Kafka RocketMQ
所属社区/公司 Apache Mozilla Public License Apache Apache/Ali
成熟度 成熟 成熟 成熟 比较成熟
生产者-消费者模式 支持 支持 支持 支持
发布-订阅 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持JAVA优先 语言无关 支持,JAVA优先 支持
单机呑吐量 万级(最差) 万级 十万级 十万级(最高)
消息延迟 - 微秒级 毫秒级 -
可用性 高(主从) 高(主从) 非常高(分布式)
消息丢失 - 理论上不会丢失 -
消息重复 - 可控制 理论上会有重复 -
事务 支持 不支持 支持 支持
文档的完备性
提供快速入门
首次部署难度 -

在大数据技术领域,一些重要的组件、框架都支持Apache Kafka,不论成成熟度、社区、性能、可靠性,Kafka都是非常有竞争力的一款产品。

哪些公司在使用Kafka

image-20230220210752801

image-20230220211406169

Kafka生态圈介绍

Apache Kafka这么多年的发展,目前也有一个较庞大的生态圈。

Kafka生态圈官网地址:https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

image-20230220211450569

Kafka版本

本次课程使用的Kafka版本为2.4.1,是2020年3月12日发布的版本。

可以注意到Kafka的版本号为:kafka_2.12-2.4.1,因为kafka主要是使用scala语言开发的,2.12为scala的版本号。http://kafka.apache.org/downloads可以查看到每个版本的发布时间。

环境搭建

搭建Kafka集群

以下基于ubuntu22.04.1

  1. 将Kafka的安装包上传到虚拟机,并解压

image-20230228164225219

1
2
3
4
5
6
7
8
sudo mkdir export
cd /export
sudo mkdir server
sudo mkdir software
sudo chmod 777 software/
sudo chmod 777 server/
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
  1. 修改 server.properties
1
2
3
4
5
6
7
8
9
# 创建Kafka数据的位置
mkdir /export/server/kafka_2.12-2.4.1/data
vim /export/server/kafka_2.12-2.4.1/config/server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=10.211.55.8:2181,10.211.55.9:2181,10.211.55.7:2181

其余两台服务器重复以上步骤,仅修改broker.id

  1. 配置KAFKA_HOME环境变量
1
2
3
4
5
6
7
8
9
sudo su
vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}
#源文件无下面这条需手动添加
export PATH

每个节点加载环境变量
source /etc/profile
  1. 启动服务器
1
2
3
4
5
6
# 启动ZooKeeper 见黑马zookeeper集群搭建
# 启动Kafka,需要在kafka根目录下启动
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server 10.211.55.8:9092 --list

image-20230301160650636

无任何报错即成功。

目录结构分析

目录名称 说明
bin Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
config Kafka的所有配置文件
libs 运行Kafka所需要的所有JAR包
logs Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
site-docs Kafka的网站帮助文件

Kafka一键启动/关闭脚本

为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

  1. 在节点1中创建 /export/onekey 目录
1
sudo mkdir onekey
  1. 准备slave配置文件,用于保存要启动哪几个节点上的kafka
1
2
3
4
5
6
7
8
9
cd /export/onekey
sudo su
#新建slave文件
touch slave

#slave中写入以下内容
10.211.55.8
10.211.55.9
10.211.55.7
  1. 编写start-kafka.sh脚本
1
2
3
4
5
6
7
8
9
10
11
vim start-kafka.sh

cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
wait
}&
done

  1. 编写stop-kafka.sh脚本
1
2
3
4
5
6
7
8
9
10
11
vim stop-kafka.sh

cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
wait
}&
done

  1. 给start-kafka.sh、stop-kafka.sh配置执行权限
1
2
chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh
  1. 执行一键启动、一键关闭

    执行shell脚本需实现服务期间ssh免密登录

    (69条消息) Ubuntu开启SSH免密登录_ubuntu配置ssh免密登录_天雪浪子的博客-CSDN博客

1
2
./start-kafka.sh
./stop-kafka.sh

当查看日志发生Error connecting to node ubuntu2:9092错误时需在三台服务器上配置如下命令

以ubuntu2为例,另外两台同样的规则配置

1
2
3
4
sudo vim /etc/hosts

10.211.55.8 ubuntu1
10.211.55.7 ubuntu3

基础操作

image-20230302165113380

创建topic

创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

1
2
3
4
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server 10.211.55.8:9092

image-20230302165400395

生产消息到Kafka

使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

1
bin/kafka-console-producer.sh --broker-list 10.211.55.8:9092 --topic test

image-20230302165621183

“>”表示等待输入

从Kafka消费消息

使用下面的命令来消费 test 主题中的消息。

1
bin/kafka-console-consumer.sh --bootstrap-server 10.211.55.8:9092 --topic test --from-beginning

生产者发送消息

image-20230302170001013

消费者接受消息

image-20230302170027164

使用Kafka Tools操作Kafka

连接Kafka集群

image-20230302170825532

image-20230302170838039

image-20230302170847729

创建topic

image-20230302170951476

mac系统需要修改本地host,否则topic节点打不开

1
2
3
4
5
sudo vim /etc/hosts

10.211.55.9 ubuntu1 #prl_hostonly shared
10.211.55.8 ubuntu2 #prl_hostonly shared
10.211.55.7 ubuntu3 #prl_hostonly shared

image-20230302193124105

image-20230302193259933

Kafka基准测试

基准测试

基准测试(benchmark testing)是一种测量和评估软件性能指标的活动。我们可以通过基准测试,了解到软件、硬件的性能水平。主要测试负载的执行时间、传输速度、吞吐量、资源占用率等。

基于1个分区1个副本的基准测试

  1. 测试步骤:
  2. 启动Kafka集群
  3. 创建一个1个分区1个副本的topic: benchmark
  4. 同时运行生产者、消费者基准测试程序
  5. 观察结果
创建topic
1
bin/kafka-topics.sh --zookeeper ubuntu1:2181 --create --topic benchmark --partitions 1 --replication-factor 1

image-20230302211748409

生产消息基准测试

在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。为了方便测试,课程上演示测试500W的消息作为基准测试。

1
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1
1
2
3
4
5
6
bin/kafka-producer-perf-test.sh 
--topic topic的名字
--num-records 总共指定生产数据量(默认5000W)
--throughput 指定吞吐量——限流(-1不指定)
--record-size record数据大小(字节)
--producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式

测试结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@ubuntu1:/export/server/kafka_2.bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1092 acks=1
237649 records sent, 47529.8 records/sec (45.33 MB/sec), 597.6 ms avg latency, 819.0 ms max latency.
221936 records sent, 44387.2 records/sec (42.33 MB/sec), 738.2 ms avg latency, 805.0 ms max latency.
309552 records sent, 61910.4 records/sec (59.04 MB/sec), 540.4 ms avg latency, 785.0 ms max latency.
291008 records sent, 58062.3 records/sec (55.37 MB/sec), 572.4 ms avg latency, 760.0 ms max latency.
289072 records sent, 57814.4 records/sec (55.14 MB/sec), 562.1 ms avg latency, 782.0 ms max latency.
450960 records sent, 90192.0 records/sec (86.01 MB/sec), 368.3 ms avg latency, 546.0 ms max latency.
417168 records sent, 83433.6 records/sec (79.57 MB/sec), 392.7 ms avg latency, 646.0 ms max latency.
443296 records sent, 88659.2 records/sec (84.55 MB/sec), 369.7 ms avg latency, 475.0 ms max latency.
455920 records sent, 91184.0 records/sec (86.96 MB/sec), 354.5 ms avg latency, 640.0 ms max latency.
476384 records sent, 95276.8 records/sec (90.86 MB/sec), 348.9 ms avg latency, 622.0 ms max latency.
464896 records sent, 92979.2 records/sec (88.67 MB/sec), 351.9 ms avg latency, 574.0 ms max latency.
454288 records sent, 90857.6 records/sec (86.65 MB/sec), 360.8 ms avg latency, 445.0 ms max latency.
478912 records sent, 95782.4 records/sec (91.35 MB/sec), 341.4 ms avg latency, 399.0 ms max latency.
5000000 records sent, 76782.505874 records/sec (73.23 MB/sec), 423.57 ms avg latency, 819.00 ms max latency, 354 ms 50th, 727 ms 95th, 779 ms 99th, 800 ms 99.9th.

测试结果:

吞吐量 76782.505874 records/sec 每秒9.3W条记录
吞吐速率 (73.23 MB/sec) 每秒约89MB数据
平均延迟时间 423.57 ms avg latency
最大延迟时间 819.00 ms max latency
消费消息基准测试
1
bin/kafka-consumer-perf-test.sh --broker-list ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000
1
2
3
4
5
bin/kafka-consumer-perf-test.sh
--broker-list 指定kafka集群地址
--topic 指定topic的名称
--fetch-size 每次拉取的数据大小
--messages 总共要消费的消息个数

测试结果

1
2
root@ubuntu1:/export/server/kafka_2.bin/kafka-consumer-perf-test.sh --broker-list ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000es 500start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-02 13:32:03:115, 2023-03-02 13:32:16:727, 4768.3716, 350.3065, 5000000, 367322.9503, 1677763923405, -1677763909793, -0.0000, -0.0030
data.consumed.in.MB 共计消费的数据 4768.3716MB
MB.sec 每秒消费的数量 350.3065 每秒350MB
data.consumed.in.nMsg 共计消费的数量 5000000
nMsg.sec 每秒的数量 367322.9503 每秒36.7W条

基于3个分区1个副本的基准测试

创建topic
1
bin/kafka-topics.sh --zookeeper ubuntu1:2181 --create --topic benchmark --partitions 3 --replication-factor 1
生产消息基准测试
1
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1

测试结果

1
2
3
4
5
6
7
8
9
root@ubuntu1:/export/server/kafka_2.bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1092 acks=1
484241 records sent, 96848.2 records/sec (92.36 MB/sec), 288.7 ms avg latency, 703.0 ms max latency.
450722 records sent, 90144.4 records/sec (85.97 MB/sec), 351.9 ms avg latency, 1714.0 ms max latency.
812375 records sent, 162475.0 records/sec (154.95 MB/sec), 208.1 ms avg latency, 1160.0 ms max latency.
934223 records sent, 186844.6 records/sec (178.19 MB/sec), 176.9 ms avg latency, 752.0 ms max latency.
691132 records sent, 138226.4 records/sec (131.82 MB/sec), 237.0 ms avg latency, 839.0 ms max latency.
706355 records sent, 141073.5 records/sec (134.54 MB/sec), 225.9 ms avg latency, 858.0 ms max latency.
592279 records sent, 118455.8 records/sec (112.97 MB/sec), 277.3 ms avg latency, 1594.0 ms max latency.
5000000 records sent, 133911.832450 records/sec (127.71 MB/sec), 240.60 ms avg latency, 1714.00 ms max latency, 18 ms 50th, 797 ms 95th, 1493 ms 99th, 1694 ms 99.9th.

测试结果:

指标 3分区1个副本 单分区单副本
吞吐量 133911.832450 records/sec 76782.505874 records/sec 每秒9.3W条记录
吞吐速率 127.71 MB/sec (73.23 MB/sec) 每秒约89MB数据
平均延迟时间 240.60 ms avg latency 423.57 ms avg latency
最大延迟时间 1714.00 ms max latency 819.00 ms max latency

在虚拟机上,因为都是共享笔记本上的CPU、内存、网络,所以分区越多,反而效率越低。但如果是真实的服务器,分区多效率是会有明显提升的。

消费消息基准测试
1
bin/kafka-consumer-perf-test.sh --broker-list ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000

测试结果

1
2
root@ubuntu1:/export/server/kafka_2.bin/kafka-consumer-perf-test.sh --broker-list ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000es 500start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-02 13:44:10:033, 2023-03-02 13:44:22:711, 4768.3716, 376.1139, 5000000, 394383.9722, 1677764650317, -1677764637639, -0.0000, -0.0030
指标 单分区3个副本 单分区单副本
data.consumed.in.MB 共计消费的数据 4768.3716 MB 4768.3716MB 4768.3716MB
MB.sec 每秒消费的数量 376.1139 350.3065 每秒350MB 445.6006 每秒445MB
data.consumed.in.nMsg 共计消费的数量 5000000 5000000
nMsg.sec 每秒的数量 394383.9722 367322.9503 每秒36.7W条

虽然是虚拟机 mac就是牛依然是提升的

基于1个分区3个副本的基准测试

创建topic
1
bin/kafka-topics.sh --zookeeper ubuntu1:2181 --create --topic benchmark --partitions 1 --replication-factor 3
生产消息基准测试
1
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1

测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
root@ubuntu1:/export/server/kafka_2.bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=ubuntu1:9092,ubuntu2:9092,ubuntu3:9092 acks=1092 acks=1
145345 records sent, 29069.0 records/sec (27.72 MB/sec), 926.1 ms avg latency, 1293.0 ms max latency.
242256 records sent, 48451.2 records/sec (46.21 MB/sec), 684.4 ms avg latency, 907.0 ms max latency.
163568 records sent, 32713.6 records/sec (31.20 MB/sec), 979.1 ms avg latency, 1229.0 ms max latency.
248480 records sent, 49696.0 records/sec (47.39 MB/sec), 675.7 ms avg latency, 971.0 ms max latency.
229616 records sent, 45923.2 records/sec (43.80 MB/sec), 706.5 ms avg latency, 868.0 ms max latency.
171936 records sent, 34387.2 records/sec (32.79 MB/sec), 840.2 ms avg latency, 1756.0 ms max latency.
186592 records sent, 37318.4 records/sec (35.59 MB/sec), 982.8 ms avg latency, 1830.0 ms max latency.
120368 records sent, 24064.0 records/sec (22.95 MB/sec), 1030.1 ms avg latency, 2565.0 ms max latency.
187792 records sent, 37558.4 records/sec (35.82 MB/sec), 1073.2 ms avg latency, 2860.0 ms max latency.
226480 records sent, 45296.0 records/sec (43.20 MB/sec), 748.3 ms avg latency, 1209.0 ms max latency.
167728 records sent, 33545.6 records/sec (31.99 MB/sec), 948.7 ms avg latency, 1645.0 ms max latency.
159312 records sent, 31862.4 records/sec (30.39 MB/sec), 1037.4 ms avg latency, 1410.0 ms max latency.
134400 records sent, 26880.0 records/sec (25.63 MB/sec), 1225.2 ms avg latency, 2018.0 ms max latency.
208432 records sent, 41678.1 records/sec (39.75 MB/sec), 779.9 ms avg latency, 886.0 ms max latency.
185632 records sent, 37119.0 records/sec (35.40 MB/sec), 862.7 ms avg latency, 1511.0 ms max latency.
202528 records sent, 40505.6 records/sec (38.63 MB/sec), 842.9 ms avg latency, 1159.0 ms max latency.
237680 records sent, 47536.0 records/sec (45.33 MB/sec), 691.4 ms avg latency, 865.0 ms max latency.
223792 records sent, 44678.0 records/sec (42.61 MB/sec), 715.1 ms avg latency, 863.0 ms max latency.
229936 records sent, 45987.2 records/sec (43.86 MB/sec), 730.5 ms avg latency, 1010.0 ms max latency.
260544 records sent, 52108.8 records/sec (49.69 MB/sec), 626.3 ms avg latency, 686.0 ms max latency.
211824 records sent, 42364.8 records/sec (40.40 MB/sec), 779.5 ms avg latency, 1792.0 ms max latency.
294144 records sent, 58828.8 records/sec (56.10 MB/sec), 561.3 ms avg latency, 661.0 ms max latency.
322064 records sent, 64412.8 records/sec (61.43 MB/sec), 508.8 ms avg latency, 570.0 ms max latency.
5000000 records sent, 41859.219074 records/sec (39.92 MB/sec), 777.09 ms avg latency, 2860.00 ms max latency, 705 ms 50th, 1245 ms 95th, 1978 ms 99th, 2840 ms 99.9th.
指标 单分区3个副本 单分区单副本
吞吐量 41859.219074 records/sec 76782.505874 records/sec 每秒9.3W条记录
吞吐速率 39.92 MB/sec (73.23 MB/sec) 每秒约89MB数据
平均延迟时间 777.09 ms avg latency 423.57 ms avg latency
最大延迟时间 2860.00 ms max latency 819.00 ms max latency

同样的配置,副本越多速度越慢。

消费消息基准测试
1
bin/kafka-consumer-perf-test.sh --broker-list buntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000

测试结果

1
2
3
root@ubuntu1:/export/server/kafka_2.bin/kafka-consumer-perf-test.sh --broker-list buntu1:9092,ubuntu2:9092,ubuntu3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000es 5000start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
[2023-03-02 13:55:39,901] WARN Couldn't resolve server buntu1:9092 from bootstrap.servers as DNS resolution failed for buntu1 (org.apache.kafka.clients.ClientUtils)
2023-03-02 13:55:39:945, 2023-03-02 13:55:55:180, 4768.3716, 312.9880, 5000000, 328191.6639, 1677765340146, -1677765324911, -0.0000, -0.0030
指标 单分区3个副本 单分区单副本
data.consumed.in.MB 共计消费的数据 4768.3716 MB 4768.3716MB 4768.3716MB
MB.sec 每秒消费的数量 312.9880 350.3065 每秒350MB 445.6006 每秒445MB
data.consumed.in.nMsg 共计消费的数量 5000000 5000000
nMsg.sec 每秒的数量 328191.6639 367322.9503 每秒36.7W条

Java编程操作Kafka

同步生产消息到Kafka中

需求

接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。

准备工作

导入Maven Kafka POM依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<repositories><!-- 代码库 -->
<repository>
<id>central</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>

<dependencies>
<!-- kafka客户端工具 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>

<!-- 工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>

<!-- SLF桥接LOG4J日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>

<!-- SLOG4J日志 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

导入log4j.properties

将log4j.properties配置文件放入到resources文件夹中

1
2
3
4
log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
创建包和类

创建包cn.itcast.kafka,并创建KafkaProducerTest类。

image-20230303164943795

代码开发

可以参考以下方式来编写第一个Kafka示例程序

参考以下文档:http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

  1. 创建用于连接Kafka的Properties配置
1
2
3
4
5
6
7
8
9
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
//这个配置是 Kafka 生产者和消费者必须要指定的一个配置项,它用于指定 Kafka 集群中的一个或多个 broker 地址,生产者和消费者将使用这些地址与 Kafka 集群建立连接。
props.put("acks", "all");
//这行代码将 acks 配置设置为 all。acks 配置用于指定消息确认的级别。在此配置下,生产者将等待所有副本都成功写入后才会认为消息发送成功。这种配置级别可以确保数据不会丢失,但可能会影响性能。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//这行代码将键(key)序列化器的类名设置为 org.apache.kafka.common.serialization.StringSerializer。键和值都需要被序列化以便于在网络上传输。这里使用的是一个字符串序列化器,它将字符串序列化为字节数组以便于发送到 Kafka 集群。
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//这行代码将值(value)序列化器的类名设置为 org.apache.kafka.common.serialization.StringSerializer。这里同样使用的是一个字符串序列化器,它将字符串序列化为字节数组以便于发送到 Kafka 集群。
  1. 创建一个生产者对象KafkaProducer
  2. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
  3. 再调用一个Future.get()方法等待响应
  4. 关闭生产者

参考代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class KafkaProducerTest {
public static void main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

// 3. 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
//"test":这个参数是指定 Kafka 主题(topic)的名称,表示这条记录将被发送到哪个主题中。
//null:这个参数表示记录的键(key)。在 Kafka 中,每条消息都可以有一个键值对,键是一个可选参数,如果没有设置,则为 null。
//i + "":这个参数表示记录的值(value)。这里的 i 是一个整数,通过将它转换为字符串来设置记录的值。这个值将被序列化为字节数组并被发送到 Kafka 集群。

综上所述,这行代码的含义是:创建一个 Kafka 生产者记录对象,将该记录的值设置为 i 的字符串形式,并指定该记录将被发送到名为 "test" 的主题中,键为 null
// 调用一个Future.get()方法等待响应
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

// 5. 关闭生产者
producer.close();
}
}

测试:

image-20230303165351697

image-20230303165432474

从Kafka的topic中消费消息

需求

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

准备工作

在cn.itcast.kafka包下创建KafkaConsumerTest

开发步骤

  1. 创建Kafka消费者配置
1
2
3
4
5
6
7
8
9
10
11
12
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
//这一行将属性"bootstrap.servers"的值设置为"node1.itcast.cn:9092"。这是Kafka生产者和消费者所需的Kafka集群地址和端口号。
props.setProperty("group.id", "test");
//这一行将属性"group.id"的值设置为"test"。这是消费者组的唯一标识符。所有属于同一组的消费者将共享一个消费者组ID。
props.setProperty("enable.auto.commit", "true");
//这一行将属性"enable.auto.commit"的值设置为"true"。这表示消费者是否应该自动提交偏移量。
props.setProperty("auto.commit.interval.ms", "1000");
//这一行将属性"auto.commit.interval.ms"的值设置为"1000"。这是消费者自动提交偏移量的时间间隔,以毫秒为单位。
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//这两行将属性"key.deserializer"和"value.deserializer"的值都设置为"org.apache.kafka.common.serialization.StringDeserializer"。这是用于反序列化Kafka消息的Java类的名称。在这种情况下,消息的键和值都是字符串类型,因此使用了StringDeserializer类来反序列化它们。
  1. 创建Kafka消费者
  2. 订阅要消费的主题
  3. 使用一个while循环,不断从Kafka的topic中拉取消息
  4. 将将记录(record)的offset、key、value都打印出来

参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.itcast.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.211.55.8:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//创建kafka消费者
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
//订阅要消费的主题
//指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList("test"));
//使用一个while循环,不断从kafka的topic中拉取消息
while(true){
//kafka一次拉取一批数据
ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofSeconds(5));
//将记录record的offset、key、value都打印出来
for (ConsumerRecord<String,String>consumerRecord:poll){
//主题
String topic = consumerRecord.topic();
//offset:这条消息处于kafka分区中的哪个位置
long offset=consumerRecord.offset();
//key\value
String key=consumerRecord.key();
String value=consumerRecord.value();
System.out.println("topic:"+topic+"offset:"+offset+"key:"+key+"value:"+value);
}
}
}
}

启动消费者,定位到最新的offset

image-20230303174132218

生产者再次发送消息,观察消费者变化

image-20230303174248747

异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:

  1. 在发送消息出现异常时,能够及时打印出异常信息
  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.itcast.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTest {
public static void main(String[] args) {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

// 3. 调用send发送1-100消息到指定Topic test
for(int i = 0; i < 100; ++i) {
// 一、同步方式
// 获取返回值Future,该对象封装了返回值
// Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + ""));
// 调用一个Future.get()方法等待响应
// future.get();

// 二、带回调函数异步方式
producer.send(new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("发送消息出现异常");
}
else {
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();

System.out.println("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
}
}
});
}

// 5. 关闭生产者
producer.close();
}
}


image-20230304125250709

架构

Kafka重要概念

broker

image-20230304125417711

  1. 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
  2. broker无状态(Sateless的,它们是通过ZooKeeper来维护集群状态
  3. 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

zookeeper

image-20230304125745530

  1. ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
  2. ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker

PS:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

producer(生产者)

生产者负责将数据推送给brokertopic

consumer(消费者)

消费者负责从brokertopic中拉取数据,并自己进行处理

consumer group(消费者组)

image-20230304125938966

  • consumer group是kafka提供的可扩展且具有容错性的消费者机制
  • 一个消费者组可以包含多个消费者
  • 一个消费者组有一个唯一的ID(group Id)
  • 组内的消费者一起消费主题的所有分区数据

分区(Partitions)

image-20230304130211916

在Kafka集群中,主题被分为多个分区

在 Kafka 中,同一个 topic 的消息可以被分配到不同的分区中,具体分配规则取决于 partitioner。

Kafka 提供了默认的 partitioner 实现,称为 DefaultPartitioner,其将消息的 key(如果存在)进行哈希,然后根据哈希值确定该消息应该被分配到哪个分区。如果消息没有 key,则采用轮询的方式将消息分配到不同的分区中。

除了默认的 partitioner,用户还可以自定义 partitioner 实现,以满足不同的需求。自定义 partitioner 实现需要实现 Kafka 提供的 Partitioner 接口,并在生产者配置中指定使用该 partitioner。

无论是使用默认的 partitioner 还是自定义 partitioner,都需要遵循以下规则:

  • 对于同一个 key,始终分配到同一个分区中。
  • 对于没有 key 的消息,应该采用随机或轮询的方式将消息分配到不同的分区中。

需要注意的是,分区数的变化也可能导致消息分配到不同的分区中。例如,当某个 topic 的分区数发生变化时,之前已经写入的消息可能会被重新分配到不同的分区中。因此,在生产者代码中应该谨慎处理分区数的变化,以避免数据丢失或重复。

副本(Replicas)

image-20230304130602885

  • 副本可以确保某个服务器出现故障时,确保数据依然可用
  • 在Kafka中,一般都会设计副本的个数>1

主题(Topic)

image-20230304130757615

  • 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
  • Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
  • 在主题中的消息是有结构的,一般一个主题包含某一类消息
  • 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

偏移量(offset)

image-20230304133000845

offset记录着下一条将要发送给Consumer的消息的序号

默认Kafka将offset存储在ZooKeeper中

在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset

偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

消费者组

Kafka支持有多个消费者同时消费一个主题中的数据。我们接下来,给大家演示,启动两个消费者共同来消费 test 主题的数据。

  1. 首先,修改生产者程序,让生产者不停地每3秒生产1-100个数字。
1
2
3
4
5
6
7
8
9
10
// 3. 发送1-100数字到Kafka的test主题中
while(true) {
for (int i = 1; i <= 100; ++i) {
// 注意:send方法是一个异步方法,它会将要发送的数据放入到一个buffer中,然后立即返回
// 这样可以让消息发送变得更高效
producer.send(new ProducerRecord<>("test", i + ""));
}
Thread.sleep(3000);
}

  1. 接下来,同时运行两个消费者。

image-20230304214510037

image-20230304214737410

image-20230304214813494

  1. 同时运行两个消费者,我们发现,只有一个消费者程序能够拉取到消息。想要让两个消费者同时消费消息,必须要给test主题,添加一个分区。

# 设置 test topic为2个分区

1
bin/kafka-topics.sh --zookeeper 10.211.55.8:2181 -alter --partitions 2 --topic test
1
2
3
xuwei@ubuntu1:/export/server/kafka_2.12-2.4.1$ bin/kafka-topics.sh --zookeeper 10.211.55.8:2181 -alter --partitions 2 --topic test
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
  1. 重新运行生产者、两个消费者程序,我们就可以看到两个消费者都可以消费Kafka Topic的数据了

image-20230304215343589

image-20230304215357086

Kafka生产者幂等性与事务

幂等性

简介

拿http举例来说,一次或多次请求,得到地响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

image-20230304221941602

如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。

Kafka生产者幂等性

image-20230304222141263

在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

配置幂等性

1
props.put("enable.idempotence",true);

幂等性原理

为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)Sequence Number的概念。

PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。

Sequence Number:针对每个生产者(对应PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number

image-20230304222724389

Kafka事务

简介

Kafka事务是2017年Kafka 0.11.0.0引入的新特性。类似于数据库的事务。Kafka事务指的是生产者生产消息以及消费者提交offset的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

image-20230304222942728

事务操作API

Producer接口中定义了以下5个事务相关方法:

  1. initTransactions(初始化事务):要使用Kafka事务,必须先进行初始化操作

  2. beginTransaction(开始事务):启动一个Kafka事务

  3. sendOffsetsToTransaction(提交偏移量):批量地将分区对应的offset发送到事务中,方便后续一块提交

  4. commitTransaction(提交事务):提交事务

  5. abortTransaction(放弃事务):取消事务

Kafka事务编程

事务相关属性配置

生产者
1
2
// 配置事务的id,开启了事务会默认开启幂等性
props.put("transactional.id", "first-transactional");
消费者
1
2
3
4
// 1. 消费者需要设置隔离级别
props.put("isolation.level","read_committed");
// 2. 关闭自动提交
props.put("enable.auto.commit", "false");

Kafka事务编程

完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package cn.itcast.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class AffairTest {
// 1. 创建消费者
public static Consumer<String, String> createConsumer() {
// 1. 创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.211.55.8:9092");
props.setProperty("group.id", "ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 2. 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅要消费的主题
consumer.subscribe(Arrays.asList("ods_user"));

return consumer;
}

public static Producer<String, String> createProducer() {
// 1. 创建生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.8:9092");
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}


public static void main(String[] args) {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// 初始化事务
producer.initTransactions();

while(true) {
try {
// 1. 开启事务
producer.beginTransaction();
// 2. 定义Map结构,用于保存分区对应的offset
Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
// 2. 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
// 4. 进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
String message = fields[0] + "," + fields[1] + "," + fields[2];

// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>("dwd_user", message));
}
// 6. 提交偏移量到事务
producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
// 7. 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 8. 放弃事务
producer.abortTransaction();
}
}
}


}

需求

在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

1
2
3
姓名,性别,出生日期
张三,1,1980-10-09
李四,0,1985-11-01

我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

启动生产者控制台程序模拟数据
1
2
3
4
5
6
7
8
# 创建名为ods_user和dwd_user的主题
bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server 10.211.55.8:9092 --topic dwd_user
# 生产数据到 ods_user
bin/kafka-console-producer.sh --broker-list 10.211.55.8:9092 --topic ods_user
# 从dwd_user消费数据
bin/kafka-console-consumer.sh --bootstrap-server 10.211.55.8:9092 --topic dwd_user --from-beginning --isolation-level read_committed

image-20230304233913158

image-20230304234018515

编写创建消费者代码

编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

创建消费者,并订阅 ods_user 主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1. 创建消费者
public static Consumer<String, String> createConsumer() {
// 1. 创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.211.55.8:9092");
props.setProperty("group.id", "ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 2. 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅要消费的主题
consumer.subscribe(Arrays.asList("ods_user"));

return consumer;
}

编写创建生产者代码

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Producer<String, String> createProduceer() {
// 1. 创建生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "10.211.55.8:9092");
props.put("transactional.id", "dwd_user");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}

编写代码消费并生产数据

实现步骤:

  1. 调用之前实现的方法,创建消费者、生产者对象

  2. 生产者调用initTransactions初始化事务

  3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic

(1) 生产者开启事务

(2) 消费者拉取消息

(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)

(4) 生产消息到dwd_user topic中

(5) 提交偏移量到事务中

(6) 提交事务

(7) 捕获异常,如果出现异常,则取消事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
Consumer<String, String> consumer = createConsumer();
Producer<String, String> producer = createProducer();
// 初始化事务
producer.initTransactions();

while(true) {
try {
// 1. 开启事务
producer.beginTransaction();
// 2. 定义Map结构,用于保存分区对应的offset
Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
// 2. 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, String> record : records) {
// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
//将当前消息所属分区的偏移量保存到HashMap中,并且将偏移量加1,以便下次从此偏移量开始消费消息。
// 4. 进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
String message = fields[0] + "," + fields[1] + "," + fields[2];
// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>("dwd_user", message));
}
// 6. 提交偏移量到事务,
producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
// 7. 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 8. 放弃事务
producer.abortTransaction();
}
}
}

将已消费的消息的偏移量提交到生产者的事务中,是为了确保在生产者发送消息到新的主题之前,已经消费的消息的偏移量已经被记录下来并保存在事务中。如果不提交偏移量,则可能会导致已经消费的消息在下一次启动消费者时重复消费。因此,将偏移量提交到生产者的事务中是非常重要的,可以确保消费者在下一次启动时可以正确地从上次停止的位置继续消费。

测试

image-20230305000317993

image-20230305000350416

成功转化并消费

模拟异常测试事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 3. 保存偏移量
offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
// 4. 进行转换处理
String[] fields = record.value().split(",");
fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";
String message = fields[0] + "," + fields[1] + "," + fields[2];

// 模拟异常
int i = 1/0;

// 5. 生产消息到dwd_user
producer.send(new ProducerRecord<>("dwd_user", message));

启动程序一次,抛出异常。

再启动程序一次,还是抛出异常。

直到我们处理该异常为止。

image-20230305000812268

image-20230305000838852

我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。

Kafka高级

学习目标

  • 理解Kafka的分区副本机制
  • 能够搭建Kafka-eagle并查看Kafka集群状态
  • 理解分区leader和follower的职责
  • 理解分区的ISR
  • 理解Kafka消息不丢失机制
  • 理解Kafka中数据清理

分区和副本机制

生产者分区写入策略

生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

  1. 轮询分区策略

  2. 随机分区策略

  3. 按key分区分配策略

  4. 自定义分区策略

轮询策略

image-20230305211026078

  • 默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区
  • 如果在生产消息时,key为null,则使用轮询算法均衡地分配分区

随机策略(不用)

随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

image-20230305211117624

按key分配策略

image-20230305211144842

按key分配策略,有可能会出现「数据倾斜」,例如:某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。

乱序问题

轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储——也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

自定义分区策略

image-20230305211507494

实现步骤:

  1. 创建自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class KeyWithRandomPartitioner implements Partitioner {

private Random r;

@Override
public void configure(Map<String, ?> configs) {
r = new Random();
}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// cluster.partitionCountForTopic 表示获取指定topic的分区数量r.nextInt(1000) 表示从随机数生成器 r 中随机生成一个小于1000的整数,其中参数1000指定了生成的随机数的范围,即生成的随机数是0到999之间的整数。在这段代码中,生成的随机数将被用于计算消息所在的分区编号。由于模运算 % cluster.partitionCountForTopic(topic) 的结果必须小于分区数量,因此这里对1000取模的目的是将随机数的范围缩小到分区数量内,以确保不会选择到超出范围的分区编号。
return r.nextInt(1000) % cluster.partitionCountForTopic(topic);
}

@Override
public void close() {
}
}

  1. 在Kafka生产者配置中,自定使用自定义分区器的类名
1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyWithRandomPartitioner.class.getName());

消费者组Rebalance机制

Rebalance再均衡

Kafka中的Rebalance称之为再均衡,是Kafka中确保Consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。

Rebalance触发的时机有:

  1. 消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。

image-20230305213339845

  1. 订阅的topic个数发生变化

消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。

image-20230305213419840

  1. 订阅的topic分区数发生变化

image-20230305213443651

Rebalance的不良影响

  • 发生Rebalance时,consumer group下的所有consumer都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配
  • Rebalance过程会对consumer group产生非常严重的影响,Rebalance的过程中所有的消费者都将停止工作,直到Rebalance完成

消费者分区分配策略

Range范围分配策略

Range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

注意:Range范围分配策略是针对每个Topic的。

配置

配置消费者的partition.assignment.strategyorg.apache.kafka.clients.consumer.RangeAssignor

算法公式

n = 分区数量 / 消费者数量

m = 分区数量 % 消费者数量

前m个消费者消费n+1个

剩余消费者消费n个

image-20230305214111243

image-20230305214406647

RoundRobin轮询策略

RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。

配置

配置消费者的partition.assignment.strategyorg.apache.kafka.clients.consumer.RoundRobinAssignor

image-20230305214555487

Stricky粘性分配策略

从Kafka 0.11.x开始,引入此类分配策略。主要目的:

  1. 分区分配尽可能均匀

  2. 在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同

没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似。

image-20230305214816094

上面如果consumer2崩溃了,此时需要进行rebalance。如果是Range分配和轮询分配都会重新进行分配,例如:

image-20230305214922864

通过上图,我们发现,consumer0和consumer1原来消费的分区大多发生了改变。接下来我们再来看下粘性分配策略。

image-20230305214947583

我们发现,Striky粘性分配策略,保留rebalance之前的分配结果。这样,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。这样可以明显减少系统资源的浪费,例如:之前consumer0、consumer1之前正在消费某几个分区,但由于rebalance发生,导致consumer0、consumer1需要重新消费之前正在处理的分区,导致不必要的系统开销。(例如:某个事务正在进行就必须要取消了)

副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。

producer的ACKs参数

对副本关系较大的就是,producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

配置:

1
2
3
4
5
Properties props = new Properties();
props.put("bootstrap.servers", "node1.itcast.cn:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

acks配置为0

image-20230305215548012

ACK为0,基准测试:

1
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=10.211.55.8:9092 acks=0

测试结果:

指标 单分区单副本(ack=0) 单分区单副本(ack=1)
吞吐量 47359.248314 records/sec 每秒4.7W条记录 40763.417279 records/sec 每秒4W条记录
吞吐速率 45.17 MB/sec 每秒约45MB数据 38.88 MB/sec 每秒约89MB数据
平均延迟时间 686.49 ms avg latency 799.67 ms avg latency
最大延迟时间 1444.00 ms max latency 1961.00 ms max latency

acks配置为1

image-20230305220619546

当生产者的ACK配置为1时,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。

acks配置为-1或者all

image-20230305220954110

1
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=10.211.55.8:9092,10.211.55.9:9092,10.211.55.7:9092 acks=0
指标 单分区单副本(ack=0) 单分区单副本(ack=1) 单分区单副本(ack=-1/all)
吞吐量 47359.248314 records/sec 每秒4.7W条记录 40763.417279 records/sec 每秒4W条记录 540.5 /s 每秒7.3W调记录
吞吐速率 45.17 MB/sec 每秒约45MB数据 38.88 MB/sec 每秒约89MB数据 0.52 MB/sec
平均延迟时间 686.49 ms avg latency 799.67 ms avg latency 120281.8 ms
最大延迟时间 1444.00 ms max latency 1961.00 ms max latency 1884.00 ms

高级(High Level)API与低级(Low Level)API

高级API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 消费者程序:从test主题中消费数据
*/
public class _2ConsumerTest {
public static void main(String[] args) {
// 1. 创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.88.100:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 2. 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅要消费的主题
consumer.subscribe(Arrays.asList("test"));

// 4. 使用一个while循环,不断从Kafka的topic中拉取消息
while (true) {
// 定义100毫秒超时
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}

  • 上面是之前编写的代码,消费Kafka的消息很容易实现,写起来比较简单
  • 不需要执行去管理offset,直接通过ZK管理;也不需要管理分区、副本,由Kafka统一管理
  • 消费者会自动根据上一次在ZK中保存的offset去接着获取数据
  • 在ZK中,不同的消费者组(group)同一个topic记录不同的offset,这样不同程序读取同一个topic,不会受offset的影响

高级API的缺点

  • 不能控制offset,例如:想从指定的位置读取
  • 不能细化控制分区、副本、ZK等

低级API

通过使用低级API,我们可以自己来控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在ZK中,使用低级API,我们可以将offset不一定要使用ZK存储,我们可以自己来存储offset。例如:存储在文件、MySQL、或者内存中。但是低级API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。

手动消费分区数据

之前的代码,我们让Kafka根据消费组中的消费者动态地为topic分配要消费的分区。但在某些时候,我们需要指定要消费的分区,例如:

  • 如果某个程序将某个指定分区的数据保存到外部存储中,例如:Redis、MySQL,那么保存数据的时候,只需要消费该指定的分区数据即可
  • 如果某个程序是高可用的,在程序出现故障时将自动重启(例如:后面我们将学习的Flink、Spark程序)。这种情况下,程序将从指定的分区重新开始消费数据。

如何进行手动消费分区中的数据呢?

  1. 不再使用之前的 subscribe 方法订阅主题,而使用 「assign」方法指定想要消费的消息
1
2
3
4
String topic = "test";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
  1. 一旦指定了分区,就可以就像前面的示例一样,在循环中调用「poll」方法消费消息

注意

  1. 当手动管理消费分区时,即使GroupID是一样的,Kafka的组协调器都将不再起作用

  2. 如果消费者失败,也将不再自动进行分区重新分配

监控工具Kafka-eagle介绍

image-20230306112953617

Kafka-Eagle简介

在开发工作中,当业务前提不复杂时,可以使用Kafka命令来进行一些集群的管理工作。但如果业务变得复杂,例如:我们需要增加group、topic分区,此时,我们再使用命令行就感觉很不方便,此时,如果使用一个可视化的工具帮助我们完成日常的管理工作,将会大大提高对于Kafka集群管理的效率,而且我们使用工具来监控消费者在Kafka中消费情况。

早期,要监控Kafka集群我们可以使用Kafka Monitor以及Kafka Manager,但随着我们对监控的功能要求、性能要求的提高,这些工具已经无法满足。

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。

官网地址:https://www.kafka-eagle.org/

安装Kafka-Eagle

开启Kafka JMX端口

JMX接口

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。

开启Kafka JMX

在启动Kafka的脚本前,添加:

1
2
3
cd ${KAFKA_HOME}
export JMX_PORT=9988
nohup bin/kafka-server-start.sh config/server.properties &

安装Kafka-Eagle

需提前准备好mysql数据库并创建ke数据库

  1. 安装JDK,并配置好JAVA_HOME

  2. 将kafka_eagle上传,并解压到 /export/server 目录中

1
2
3
4
5
cd cd /export/software/
tar -xvzf kafka-eagle-bin-3.0.1.tar.gz -C ../server/
cd /export/server/kafka-eagle-bin-3.0.1/
tar -xvzf efak-web-3.0.1-bin.tar.gz
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1
  1. 配置 kafka_eagle 环境变量。
1
2
3
4
vim /etc/profile
export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
  1. 配置 kafka_eagle。使用vi打开conf目录下的system-config.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
vim conf/system-config.properties
# 修改第4行,配置kafka集群别名
kafka.eagle.zk.cluster.alias=cluster1
# 修改第5行,配置ZK集群地址
cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
# 注释第6行
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
# 修改第32行,打开图标统计
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

# 注释第69行,取消sqlite数据库连接配置
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

# 修改第77行,开启mys
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://10.211.55.8:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=52809329
  1. 配置JAVA_HOME
1
2
3
4
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin
vim ke.sh
# 在第24行添加JAVA_HOME环境配置
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-arm64
  1. 修改Kafka eagle可执行权限
1
2
cd /export/server/kafka-eagle-bin-3.0.1/efak-web-3.0.1/bin
chmod +x ke.sh
  1. 启动 kafka_eagle
1
./ke.sh start
  1. 访问Kafka eagle,默认用户为admin,密码为:123456
1
http://10.211.55.8:8048/ke

Kafka度量指标

topic list

点击Topic下的List菜单,就可以展示当前Kafka集群中的所有topic。

image-20230306161949098

image-20230306162019519

指标 意义
Brokers Spread broker使用率
Brokers Skew 分区是否倾斜
Brokers Leader Skew leader partition是否存在倾斜

生产者消息总计

image-20230306162329012

Kafka原理

分区的leader与follower

Leader和Follower

在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀地分配在每个broker上。我们正常使用kafka是感觉不到leaderfollower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障时,follower就会被选举为leader所以,可以这样说:

  • Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步
  • 如果leader出现故障,其他follower会被重新选举为leader
  • follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中

image-20230306200630181

查看某个partition的leader

使用Kafka-eagle查看某个topicpartitionleader在哪个服务器中。为了方便观察,我们创建一个名为test的3个分区、3个副本的topic

image-20230306201257833

image-20230306201415040

AR、ISR、OSR

在实际环境中,leader有可能会出现一些故障,所以Kafka一定会选举出新的leader。在讲解leader选举之前,我们先要明确几个概念。Kafka中,把follower可以按照不同状态分为三类——ARISROSR

  • 分区的所有副本称为 「AR」(Assigned Replicas——已分配的副本)
  • 所有与leader副本保持一定程度同步的副本(包括 leader 副本在内)组成 「ISR」(In-Sync Replicas——在同步中的副本)
  • 由于follower副本同步滞后过多的副本(不包括 leader 副本)组成 「OSR」(Out-of-Sync Replias)
  • AR = ISR + OSR
  • 正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISROSR集合为空。

image-20230306202506289

查看分区的ISR

  1. 使用Kafka Eagle查看某个Topic的partition的ISR有哪几个节点。

image-20230306202600726

  1. 尝试关闭id为3的broker(杀掉该broker的进程),参看topic的ISR情况。

image-20230306202749974

Leader选举

leader对于消息的写入以及读取是非常关键的,此时有两个疑问:

  1. Kafka如何确定某个partition是leader、哪个partition是follower呢?

  2. 某个leader崩溃了,如何快速确定另外一个leader呢?因为Kafka的吞吐量很高、延迟很低,所以选举leader必须非常快

如果leader崩溃,Kafka会如何?

使用Kafka Eagle找到某个partition的leader,再找到leader所在的broker。在Linux中强制杀掉该Kafka的进程,然后观察leader的情况。

image-20230306204137651

杀死broker2后partition0重新选举broker3为leader

image-20230306204406984

Controller介绍
  • Kafka启动时,会在所有的broker中选择一个controller
  • 前面leaderfollower是针对partition,而controller是针对broker
  • 创建topic、或者添加分区修改副本数量之类的管理任务都是由controller完成的
  • Kafka分区leader的选举,也是由controller决定的
Controller的选举
  • 在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)
  • 但只有一个竞争成功,其他的broker会注册该节点的监视器
  • 一点该临时节点状态发生变化,就可以进行相应的处理
  • Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller
找到当前Kafka集群的controller
  1. 点击Kafka Tools的「Tools」菜单,找到「ZooKeeper Brower…」

  2. 点击左侧树形结构的controller节点,就可以查看到哪个broker是controller了。

image-20230306205729450

测试controller选举

通过kafka tools找到controller所在的broker对应的kafka进程,杀掉该进程,重新打开ZooKeeper brower,观察kafka是否能够选举出来新的Controller。

image-20230306210201988

Controller选举partition leader
  • 所有Partition的leader选举都由controller决定
  • controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker
  • controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader否则,则任意选这个一个Replica作为leader
  • 如果该partition的所有Replica都已经宕机,则新的leader为-1

具体来说,当一个分区的 leader 副本失效时,follower 副本会发现并向其它 broker 节点发送请求,申请成为该分区的新 leader。同时,每个 broker 节点会周期性地向 controller 节点发送心跳请求,汇报自己当前的状态和可用性信息。controller 节点会根据这些信息,选择一个健康的、可用的 broker 节点作为该分区的新 leader。

在选举新 leader 的过程中,controller 节点会参考如下因素:

  1. 副本状态:只有处于 ISR(in-sync replicas)列表中的 follower 副本才有资格成为新 leader,因为它们的数据已经与 leader 同步。
  2. 副本位置:controller 节点会选择与原 leader 副本相同或更靠前的位置作为新 leader 的位置,以确保最小化数据丢失。
  3. 副本健康状况:controller 节点会优先选择健康的、可用的 broker 节点作为新 leader,以确保高可用性和服务质量。

总之,controller 节点会综合考虑多个因素,选出一个最适合成为新 leader 的 broker 节点,从而保障 Kafka 集群的高可用性和稳定性。

为什么不能通过ZK的方式来选举partition的leader?

  • Kafka集群如果业务很多的情况下,会有很多的partition
  • 假设某个broker宕机,就会出现很多的partiton都需要重新选举leader
  • 如果使用zookeeper选举leader,会给zookeeper带来巨大的压力。所以,kafka中leader的选举不能使用ZK来实现

leader负载均衡

Preferred Replica
  • Kafka中引入了一个叫做「preferred-replica」的概念,意思就是:优先的Replica
  • 在ISR列表中,第一个replica就是preferred-replica
  • 第一个分区存放的broker,肯定就是preferred-replica
  • 执行以下脚本可以将preferred-replica设置为leader,均匀分配每个分区的leader。
1
./kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic 主题 --partition=1 --election-type preferred
确保leader在broker中负载均衡

杀掉test主题的某个broker,这样kafka会重新分配leader。等到Kafka重新分配leader之后,再次启动kafka进程。此时:观察test主题各个分区leader的分配情况。

image-20230306213503968

此时,会造成leader分配是不均匀的,所以可以执行以下脚本来重新分配leader:

1
bin/kafka-leader-election.sh --bootstrap-server 10.211.55.8:9092 --topic test --partition=1 --election-type preferred

–partition:指定需要重新分配leader的partition编号

1
2
root@ubuntu3:/export/server/kafka_2.12-2.4.1# bin/kafka-leader-election.sh --bootstrap-server 10.211.55.8:9092 --topic test --partition=1 --election-type preferred
Successfully completed leader election (PREFERRED) for partitions test-1

image-20230306213706090

Kafka生产、消费数据工作流程

Kafka数据写入流程

image-20230306214611502

  • 生产者先从 zookeeper 的 “/brokers/topics/主题名/partitions/分区名/state”节点找到该 partition 的leader

image-20230306214903945

  • 生产者在ZK中找到该ID找到对应的broker

image-20230306215050562

  • broker进程上的leader将消息写入到本地log中
  • follower从leader上拉取消息,写入到本地log,并向leader发送ACK
  • leader接收到所有的ISR中的Replica的ACK后,并向生产者返回ACK。

Kafka数据消费流程

两种消费模式

image-20230306215312539

  • kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息
  • 消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
Kafka消费数据流程

image-20230306215711730

  • 每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区
  • 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)
  • 找到该分区的leader,拉取数据
  • 消费者提交offset

Kafka的数据存储形式

image-20230306221446868

  • 一个topic由多个分区组成
  • 一个分区(partition)由多个segment(段)组成
  • 一个segment(段)由多个文件组成(log、index、timeindex)

存储日志

接下来,我们来看一下Kafka中的数据到底是如何在磁盘中存储的。

  • Kafka中的数据是保存在 /export/server/kafka_2.12-2.4.1/data中
  • 消息是保存在以:「主题名-分区ID」的文件夹中的
  • 数据文件夹中包含以下内容:

image-20230306221727670

这些分别对应:

文件名 说明
00000000000000000000.index 索引文件,根据offset查找数据就是通过该索引文件来操作的
00000000000000000000.log 日志数据文件
00000000000000000000.timeindex 时间索引
leader-epoch-checkpoint 持久化每个partition leader对应的LEO (log end offset、日志文件中下一条待写入消息的offset)

每个日志文件的文件名为起始偏移量,因为每个分区的起始偏移量是0,所以,分区的日志文件都以0000000000000000000.log开始

默认的每个日志文件最大为「log.segment.bytes =102410241024」1G

为了简化根据offset查找消息,Kafka日志文件名设计为开始的偏移量

观察测试

为了方便测试观察,新创建一个topic:「test_10m」,该topic每个日志数据文件最大为10M

1
bin/kafka-topics.sh --create --zookeeper 10.211.55.8 --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes=10485760

使用之前的生产者程序往「test_10m」主题中生产数据,可以观察到如下:

image-20230307164926265

写入消息

新的消息总是写入到最后的一个日志文件中

该文件如果到达指定的大小(默认为:1GB)时,将滚动到一个新的文件中

image-20230307165109262

读取消息

image-20230307165236902

根据「offset」首先需要找到存储数据的 segment 段(注意:offset指定分区的全局偏移量)

然后根据这个「全局分区offset」找到相对于文件的「segment段offset」

image-20230307165349810

  • 最后再根据 「segment段offset」读取消息
  • 为了提高查询效率,每个文件都会维护对应的范围内存,查找的时候就是使用简单的二分查找
删除消息
  • 在Kafka中,消息是会被定期清理的。一次删除一个segment段的日志文件
  • Kafka的日志管理器,会根据Kafka的配置,来决定哪些文件可以被删除

消息不丢失机制

broker数据不丢失

生产者通过分区的leader写入数据后,所有在ISR中follower都会从leader中复制数据,这样,可以确保即使leader崩溃了,其他的follower的数据仍然是可用的

1.1.1 生产者数据不丢失

  • 生产者连接leader写入数据时,可以通过ACK机制来确保数据已经成功写入。ACK机制有三个可选配置
  1. 配置ACK响应要求为 -1 时 —— 表示所有的节点都收到数据(leader和follower都接

收到数据)

  1. 配置ACK响应要求为 1 时 —— 表示leader收到数据

  2. 配置ACK影响要求为 0 时 —— 生产者只负责发送数据,不关心数据是否丢失(这种情

况可能会产生数据丢失,但性能是最好的)

  • 生产者可以采用同步和异步两种方式发送数据
  1. 同步:发送一批数据给kafka后,等待kafka返回结果
  2. 异步:发送一批数据给kafka,只是提供一个回调函数。

说明:如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据。

消费者数据不丢失

在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。

数据积压

Kafka消费者消费数据的速度是非常快的,但如果由于处理Kafka消息时,由于有一些外部IO、或者是产生网络拥堵,就会造成Kafka中的数据积压(或称为数据堆积)。如果数据一直积压,会导致数据出来的实时性受到较大影响。

使用Kafka-Eagle查看数据积压情况

image-20230307173313570

image-20230307173543138

image-20230307173531368

解决数据积压问题

当Kafka出现数据积压问题时,首先要找到数据积压的原因。以下是在企业中出现数据积压的几个类场景。

数据写入MySQL失败

问题描述

某日运维人员找到开发人员,说某个topic的一个分区发生数据积压,这个topic非常重要,而且开始有用户投诉。运维非常紧张,赶紧重启了这台机器。重启之后,还是无济于事。

问题分析

消费这个topic的代码比较简单,主要就是消费topic数据,然后进行判断在进行数据库操作。运维通过kafka-eagle找到积压的topic,发现该topic的某个分区积压了几十万条的消息。

最后,通过查看日志发现,由于数据写入到MySQL中报错,导致消费分区的offset一自没有提交,所以数据积压严重。

因为网络延迟消费失败

问题描述

基于Kafka开发的系统平稳运行了两个月,突然某天发现某个topic中的消息出现数据积压,大概有几万条消息没有被消费。

问题分析

通过查看应用程序日志发现,有大量的消费超时失败。后查明原因,因为当天网络抖动,通过查看Kafka的消费者超时配置为50ms,随后,将消费的时间修改为500ms后问题解决。

Kafka中数据清理(Log Deletion)

Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:

日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。

日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。

在Kafka的broker或topic配置中:

配置项 配置值 说明
log.cleaner.enable true(默认) 开启自动清理日志功能
log.cleanup.policy delete(默认) 删除日志
log.cleanup.policy compaction 压缩日志
log.cleanup.policy delete,compact 同时支持删除、压缩

日志删除

日志删除是以段(segment日志)为单位来进行定期清理的。

定时日志删除任务

Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:

  1. 基于时间的保留策略

  2. 基于日志大小的保留策略

  3. 基于日志起始偏移量的保留策略

image-20230307194628338

基于时间的保留策略

以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:

  • log.retention.hours
  • log.retention.minutes
  • log.retention.ms

其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:

log.retention.hours=168

也就是,默认日志的保留时间为168小时,相当于保留7天。

删除日志分段时:

  1. 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作

  2. 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)

Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。

设置topic 5秒删除一次
  1. 为了方便观察,设置段文件的大小为1M。

image-20230307194939270

  1. 设置topic的删除策略

key: retention.ms

value: 5000

image-20230307195136561

尝试往topic中添加一些数据,等待一会,观察日志的删除情况。我们发现,日志会定期被标记为删除,然后被删除。

基于日志大小的保留策略

日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。

注意:

log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。

基于日志起始偏移量保留策略

每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。

日志压缩(Log Compaction)

Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。

image-20230307200241952

  • Log Compaction执行后,offset将不再连续,但依然可以查询Segment
  • Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
  • Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
  • 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态

Kafka配额限速机制(Quotas)

生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。

限制producer端速率

为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:

1
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试,观察生产消息的速率

1
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1

结果:

50000 records sent, 1108.156028 records/sec (1.06 MB/sec)

限制consumer端速率

对consumer限速与producer类似,只不过参数名不一样。

为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:

1
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default

运行基准测试:

1
bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092 --topic test --fetch-size 1048576 --messages 500000

结果为:

MB.sec:1.0743

取消Kafka的Quota配置

使用以下命令,删除Kafka的Quota配置

1
2
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
bin/kafka-configs.sh --zookeeper node1.itcast.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default