KAFKA

1.根目录下的结构

1
2
[zk: localhost:2181(CONNECTED) 72] ls /
[isr_change_notification, zookeeper, admin, consumers, cluster, config, latest_producer_id_block, controller, brokers, controller_epoch]

2. admin 的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[zk: localhost:2181(CONNECTED) 73] ls /admin
[delete_topics]
[zk: localhost:2181(CONNECTED) 74] ls /admin/delete_topics
[]
[zk: localhost:2181(CONNECTED) 75] get /admin/delete_topics
null
cZxid = 0xe
ctime = Sun Sep 02 20:04:12 PDT 2018
mZxid = 0xe
mtime = Sun Sep 02 20:04:12 PDT 2018
pZxid = 0xe
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 76]

3.consumers 的结构

本环境上没有设置消费组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[zk: localhost:2181(CONNECTED) 78] ls /consumers
[]
[zk: localhost:2181(CONNECTED) 79] get /consumers
null
cZxid = 0x2
ctime = Sun Sep 02 20:04:12 PDT 2018
mZxid = 0x2
mtime = Sun Sep 02 20:04:12 PDT 2018
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 80]
## 4.config的结构
[zk: localhost:2181(CONNECTED) 82] ls /config
[topics, clients, changes]
[zk: localhost:2181(CONNECTED) 83] ls /config/topics
[__consumer_offsets, test, test2]
[zk: localhost:2181(CONNECTED) 84] ls /config/topics/test2
[]
[zk: localhost:2181(CONNECTED) 85] get /config/topics/test2
{"version":1,"config":{}}
cZxid = 0xcc
ctime = Sun Sep 02 20:18:06 PDT 2018
mZxid = 0xcc
mtime = Sun Sep 02 20:18:06 PDT 2018
pZxid = 0xcc
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0
[zk: localhost:2181(CONNECTED) 86]

5.controllers 的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
[zk: localhost:2181(CONNECTED) 92] ls /controller
[]
[zk: localhost:2181(CONNECTED) 93] get /controller
{"version":1,"brokerid":0,"timestamp":"1535943857541"} //表示broker0为kafka的控制节点
cZxid = 0x15
ctime = Sun Sep 02 20:04:17 PDT 2018
mZxid = 0x15
mtime = Sun Sep 02 20:04:17 PDT 2018
pZxid = 0x15
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1659d6268f60000
dataLength = 54
numChildren = 0
[zk: localhost:2181(CONNECTED) 94]
## 6.brokers的结构
[zk: localhost:2181(CONNECTED) 95] ls /brokers
[seqid, topics, ids]
[zk: localhost:2181(CONNECTED) 96] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 97] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1535943859255","port":9092,"version":4}
cZxid = 0x1c
ctime = Sun Sep 02 20:04:19 PDT 2018
mZxid = 0x1c
mtime = Sun Sep 02 20:04:19 PDT 2018
pZxid = 0x1c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1659d6268f60000
dataLength = 188
numChildren = 0
[zk: localhost:2181(CONNECTED) 98]

------------------------------brokers/topics----------------------
[zk: localhost:2181(CONNECTED) 100] ls /brokers/topics
[__consumer_offsets, test, test2]
[zk: localhost:2181(CONNECTED) 101] ls /brokers/topics/test

test test2
[zk: localhost:2181(CONNECTED) 101] ls /brokers/topics/test2
[partitions]
[zk: localhost:2181(CONNECTED) 102] ls /brokers/topics/test2/partitions
[2, 1, 0]
[zk: localhost:2181(CONNECTED) 103] ls /brokers/topics/test2/partitions/2
[state]
[zk: localhost:2181(CONNECTED) 104] ls /brokers/topics/test2/partitions/2/state
[]
[zk: localhost:2181(CONNECTED) 105] get /brokers/topics/test2/partitions/2/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
//表示partition 0 的leader是在0 broker上
cZxid = 0xd2
ctime = Sun Sep 02 20:18:06 PDT 2018
mZxid = 0xd2
mtime = Sun Sep 02 20:18:06 PDT 2018
pZxid = 0xd2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
[zk: localhost:2181(CONNECTED) 106]

6.深入理解生产者和消费者

https://www.cnblogs.com/mcbye/p/kafka-producer-in-detail.html
https://www.cnblogs.com/mcbye/p/kafka-consumer-in-detail.html

7.重平衡

说完消费者组,再来说说与消费者组息息相关的重平衡机制。重平衡可以说是 kafka 为人诟病最多的一个点了。

重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配 topic 中的每一个分区。比如一个 topic 有 100 个分区,一个消费者组内有 20 个消费者,在协调者的控制下让组内每一个消费者分配到 5 个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
  • 主题的分区数发生变更,kafka 目前只支持增加分区,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从 kafka 消费消息,这对 kafka 的 TPS 影响极大,而如果 kafka 集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间 kafka 基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。

了解了什么是重平衡,重平衡的缺点和触发条件后,我们先来看看重平衡的三种不同策略,然后说说应该如何避免重平衡发生。

三种重平衡策略

kafka 提供了三种重平衡分配策略,这里顺便介绍一下:

Range

具体实现位于,package org.apache.kafka.clients.consumer.RangeAssignor。

这种分配是基于每个主题的分区分配,如果主题的分区分区不能平均分配给组内每个消费者,那么对该主题,某些消费者会被分配到额外的分区。我们来看看具体的例子。

举例:目前有两个消费者 C0 和 C1,两个主题 t0 和 t1,每个主题三个分区,分别是 t0p0,t0p1,t0p2,和 t1p0,t1p1,t1p2。

那么分配情况会是:

  • C0:t0p0, t0p1, t1p0, t1p1
  • C1:t0p2, t1p2

我来大概解释一下,range 这种模式,消费者被分配的单位是基于主题的,拿上面的例子来说,是主题 t0 的三个分区分配给 2 个消费者,t1 三个分区分配给消费者。于是便会出现消费者 c0 分配到主题 t0 两个分区,以及 t1 两个分区的情况(一个主题有三个分区,三个分区无法匹配两个消费者,势必有一个消费者分到两个分区),而非每个消费者分配两个主题各三个分区。

RoundRobin

具体实现位于,package org.apache.kafka.clients.consumer.RoundRobinAssignor。

RoundRobin 是基于全部主题的分区来进行分配的,同时这种分配也是 kafka 默认的 rebalance 分区策略。还是用刚刚的例子来看,

举例:两个消费者 C0 和 C1,两个主题 t0 和 t1,每个主题三个分区,分别是 t0p0,t0p1,t0p2,和 t1p0,t1p1,t1p2。

由于是基于全部主题的分区,那么分配情况会是:

  • C0:t0p0, t0p1, t1p1
  • C1:t1p0, t0p2, t1p2
    因为是基于全部主题的分区来平均分配给消费者,所以这种分配策略能更加均衡得分配分区给每一个消费者。

上面说的都是同一消费者组内消费组都订阅相同主题的情况。更复杂的情况是,同一组内的消费者订阅不同的主题,那么任然可能会导致分区不均衡的情况。

还是举例说明,有三个消费者 C0,C1,C2 。三个主题 t0,t1,t2,分别有 1,2,3 个分区 t0p0,t1p0,t1p1,t2p0,t2p1,t2p2。

其中,C0 订阅 t0,C1 订阅 t0,t1。C2 订阅 t0,t1,t2。最终订阅情况如下:

  • C0:t0p0
  • C1:t1p0
  • C2:t1p1,t2p0,t2p1,t2p2
    这个结果乍一看有点迷,其实可以这样理解,按照序号顺序进行循环分配,t0 只有一个分区,先碰到 C0 就分配给它了。t1 有两个分区,被 C1 和 C2 订阅,那么会循环将两个分区分配出去,最后到 t2,有三个分区,却只有 C2 订阅,那么就将三个分区分配给 C2。

Sticky

Sticky 分配策略是最新的也是最复杂的策略,其具体实现位于 package org.apache.kafka.clients.consumer.StickyAssignor。

这种分配策略是在 0.11.0 才被提出来的,主要是为了一定程度解决上面提到的重平衡非要重新分配全部分区的问题。称为粘性分配策略。

听名字就知道,主要是为了让目前的分配尽可能保持不变,只挪动尽可能少的分区来实现重平衡。

还是举例说明,有三个消费者 C0,C1,C2 。三个主题 t0,t1,t2,t3。每个主题各有两个分区, t0p0,t0p1,t1p0,t1p1,t2p0,t2p1,t3p0,t3p1。

现在订阅情况如下:

  • C0:t0p0,t1p1,t3p0
  • C1:t0p1,t2p0,t3p1
  • C2:t1p0,t2p1

假设现在 C1 挂掉了,如果是 RoundRobin 分配策略,那么会变成下面这样:

  • C0:t0p0,t1p0,t2p0,t3p0
  • C2:t0p1,t1p1,t2p1,t3p1

就是说它会全部重新打乱,再分配,而如何使用 Sticky 分配策略,会变成这样:

  • C0:t0p0,t1p1,t3p0,t2p0
  • C2:t1p0,t2p1,t0p1,t3p1

也就是说,尽可能保留了原来的分区情况,不去改变它,在这个基础上进行均衡分配,不过这个策略目前似乎还有些 bug,所以实际使用也不多。

避免重平衡

要说完全避免重平衡,那是不可能滴,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以这里主要介绍如何尽力避免消费者故障。

而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制,这里也不多讨论。

首先要知道,如果消费者真正挂掉了,那我们是没有什么办法的,但实际中,会有一些情况,会让 kafka 错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。

当然要避免,那首先要知道哪些情况会出现错误判断挂掉的情况。在分布式系统中,通常是通过心跳来维持分布式系统的,kafka 也不例外。对这部分内容有兴趣可以看看我之前的这篇分布式系统一致性问题与 Raft 算法(上)。这里要说的是,在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在 kafka 消费者场景中,session.timout.ms 参数就是规定这个超时时间是多少。

还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。

此外,还有最后一个参数,max.poll.interval.ms,我们都知道消费者 poll 数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是 5 分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。

小结一下,其实主要就是三个参数,session.timout.ms 控制心跳超时时间,heartbeat.interval.ms 控制心跳发送频率,以及 max.poll.interval.ms 控制 poll 的间隔。这里给出一个相对较为合理的配置,如下:

session.timout.ms:设置为 6s
heartbeat.interval.ms:设置 2s
max.poll.interval.ms:推荐为消费者处理消息最长耗时再加 1 分钟

8.无法消费消息

kafka 消费不到数据的原因,首先检查配置之类的,如是否设置了 group.id,对应的 topic 是否正确等等,这些不多说。

下面是我遇到的几种 kafka 消费不到数据的情况:

8.1 重复消费

参考https://www.jianshu.com/p/d63c1576e6cc
下面进行详细分析:

“消费确认”是所有消息中间件都要解决的一个问题,在 kafka 中涉及到两个消费位置:

(1)当前取消息所在的 consume offset;

(2)程序处理完毕发送 ack(确认字符)后所确定的 committed offset。

很显然,在异步模式下,committed offset 要落后于 consume offset。假如 consumer 挂了重启,那么它将从 commited offset 位置处开始重新消费,而不是 consume offset 位置,这也就意味着很可能重复消费,所以会导致一条数据也抓不到。

那么怎么解决这个问题呢?

答案就是自己保存 commited offset,而不是依赖 kafka 的集群保存 commited offset,把消息的处理和保存 offset 做成一个原子操作。

如何将消息的处理和保存 offset 做成一个原子操作呢,Kafka 的官方文档列举了自己保存 offset 的两种使用场景:

要自己保存 committed offset,就要做到以下几个操作:

1
2
3
Configure enable.auto.commit=false   //禁用自动ack
Use the offset provided with each ConsumerRecord to save your position. //每次取到消息,把对应的offset存下来
On restart restore the position of the consumer using seek(TopicPartition, long).//下次重启,通过consumer.seek函数,定位到自己保存的offset,从那开始消费

Kafka 本身的机制只能保证消息不漏,即”at least once”,而通过自己来保存 committed offset,我们可以实现消费端的消息不重,即”exactly once”,达到消息不重不丢的目的。

8.2 消息被清理掉

https://www.cnblogs.com/sylvialucy/p/7827044.html

1.长时间不消费导致 log.retention.hours 或者 log.retention.minutes 超时,清除 log,Offset.Stored 失效

1
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), new Offset(index)) });

2.我一次加数据太多导致磁盘耗尽,kafka 管理员帮我改到 20G 内存,但是仍然有一部分数据超出,分区 offset 靠前的数据被清除,导致再次消费不到。清除掉的数据无法再次被消费,但是还保存的数据可以消费到.

解决办法

1
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Beginning) });

或者在配置中加

1
2
auto.offset.reset=smallest //.NET 默认是largest
auto.offset.reset=earliest//Java 默认是latest

8.3 kafka 手动清除 topic

当手动删除 Kafka 某一分片上的消息日志时,如上图蓝线所示,此是只是将 Kafka Log 中的信息清 0 了,但是 Zookeeper 中的 Partition 和 Offset 数据依然会记录。当重新启动 Kafka 后,我们会发现如下二种情况:

 A、客户端无法正常用消费;

 B、在使用Kafka Consumer Offset  Monitor工具进行Kafka监控时会发现Lag(还有多少消息数未读取(Lag=logSize-Offset))为负数;其中此种情况的删除操作需要我们重点关注,后面我们也会详细介绍其对应的操作步骤。

一般正常情况,如果想让 Kafka 客户端正常消费,那么需要 Zookeeper 和 Kafka Log 中的记录保持如上图黄色所示。
二、Kafka 消息日志清除

操作步骤主要包括:

 1、停止Kafka运行;

 2、删除Kafka消息日志;

 3、修改ZK的偏移量;

 4、重启Kafka;

上述步骤重点介绍其中的关键步骤。

第 2 步:删除 Kafka 消息日志时,进入 Kafka 消息日志路径(可通过查看$KAFKA_HOME/config/server.properties 中的“log.dirs”知晓),删除相应 topic 文件夹下所有文件(如:“rm -rf ./topicA”);

第 3 步:修改 ZK 的偏移量时,进入 ZK 的安装目录下,运行./bin/zkCli.sh -server (中间以,分割),如果不带 server 默认修改的为本机。

示例如下:

 A.运行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181

 B.在ZK上运行ls /consumers/对应的分组/offset/对应的topic,就可以看到此topic下的所有分区了;

  通过get /consumers/对应的分组/offset/对应的topic/对应的分区号,可以查询到该分区上记录的offset;

  通过set /consumers/对应的分组/offset/对应的topic/对应的分区号 修改后的值(一般为0,重置),即可完成对offset的修改;

(注意:B 步骤中的“/consumers”由实际配置情况决定)

三、重建 Topic

操作步骤主要包括如下:

  1、删除Topic;

  2、删除log日志;

  3、删除ZK中的Topic记录

第一步:删除 Topic

运行$KAFKA_HOME/bin/kafka-topics.sh -delete -zookeeper [zookeeper server] -topic [topic name];如果 kafka 启动时加载的配置文件 server.properties 没有配置 delete.topic.enable = true,那么此时的删除并不是真正的删除。而只是把 topic 标记为:marked for deletion,此时就需要执行第 3 步的操作;

第三步:删除 ZK 中的 Topic 记录

示例如下:

 A.运行$ZOOKEEPER_HOME/bin/zkCli.sh -server Master:2181,Slave1:2181,Slave2:2181

 B.进入/admin/delete_topics目录下,找到删除的topic,删除对应的信息。

四、重新启动 Kafka 集群

控制器

Kafka 使用 Zookeeper 的临时节点来选举控制器, 并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免“脑裂” 。“脑裂 ”是指两个节点同时认为自己是 当前的控制器。

脑裂问题

  • 什么是脑裂?
    kafka 中只有一个控制器 controller 负责分区的 leader 选举,同步 broker 的新增或删除消息,但有时由于网络问题,可能同时有两个 broker 认为自己是 controller,这时候其他的 broker 就会发生脑裂,不知道该听从谁的。

  • 如何解决?controller epoch
    每当新的 controller 产生的时候就会在 zk 中生成一个全新的、数值更大的 controller epoch 的标识,并同步给其他的 broker 进行保存,这样当第二个 controller 发送指令时,其他的 broker 就会自动忽略。

  • 选举问题?
    每个消息有自己的 topic 每个 topic 有多个分区 多个分区位于不同的 broker 每个分区有一个主分区和多个从分区。

每个分区都有一个主分区(leader)和多个从分区(fowller) 当一个 broker 宕机时,存在与该 broker 的主分区也会停止服务,因此要重新选举新的 leader 分区。

  • 如何选举?
    控制器会从 zk 中读取 ISR 列表 选取下一个有效的分区副本成为新的 leader

Kafka 会在 Zookeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。如果这个集合有增减,kafka 会更新 zookeeper 上的记录。

通信模型

在 kafka 架构中,通信涉及到以下几点:

  1. producer 向 broker 集群生产数据数据 push 形式;
  2. consumer 从 broker 集群消费数据属于 pull 形式;
  3. broker 之间在 replication.factor>1 时,会存在副本之间数据同步,表现为:follower partition 从 leader partition pull 数据,来保证最大限度的拉近 partition 数据不一致。

Kafka Stream

支持的功能展示:

  • 字数统计这个例子用于演示 map 与 filter 模式以及简单的聚合
  • 另一个股票交易试产的各种统计信息,用于演示基于时间窗口的聚合
  • 最后使用填充点击事件流的例子来演示流的连接

KTable

KTable 可以存储状态,分为两种,内存和磁盘。磁盘依赖于 rockdb,内存依赖于 map。
KTable 具有容错功能,使用 kafka 存储事件流,用于恢复。
当使用基于 rockdb 的状态存储时,KTable 会优先从本地恢复,如果本地文件丢失则从 kafka 的 change log 事件流 topic 中,回放消息来恢复。
当使用基于内存的状态存储时,KTable 会直接从 kafka 的 change log 事件流 topic 中,回放消息来恢复。

Kafka Connect 有待了解???

Kafka 分区选择源码实现

消息 key 为空时,如果有缓存分区,使用缓存分区,没有缓存则随机选择
消息 key 不为空时,对 key 进行 HASH,然后对分区数取模
https://blog.csdn.net/liangwenmail/article/details/108321143#:~:text=StickyPartitionCache%20%E6%98%AF%20Kafka%20Client%20%E5%86%85%E9%83%A8%E7%9A%84%E4%B8%80%E4%B8%AA%E7%B1%BB%EF%BC%8C%E7%94%A8%E4%BA%8E%E7%AE%A1%E7%90%86%20Topic%20%E7%9A%84%E5%88%86%E5%8C%BA%E9%80%89%E6%8B%A9%E7%9A%84%E9%80%BB%E8%BE%91%E5%92%8C%E7%BC%93%E5%AD%98%E3%80%82,1%202%203%204%205%206%207%208
https://segmentfault.com/a/1190000020515457
https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

Kafka 消费者蛇者偏移量

https://blog.csdn.net/zzti_erlie/article/details/93637932