了不起的消息队列(三):致敬匠心,Kafka

背景

在本系列的前一篇博文中,笔者对 RabbitMQ 基本框架、概念、通信过程等基础原理,RabbitMQ 安装教程,RabbitMQ 在项目中实际应用场景等进行了详细的讲解。经过上一篇博客介绍,相信大家对 RabbitMQ 已经有了一个大致了解。Kafka 是由 LinkedIn 公司采用 Scala 语言开发的一个分布式、多分区、多副本且基于 Zookeeper 协调的分布式消息系统,现已捐献给 Apache 基金会。它是一种高吞吐量的分布式发布订阅消息系统,以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如 Cloudera、Apache Storm、Spark、Flink 等都支持与 Kafka 集成。

本文意在介绍 Kafka 的基本原理,包括 Kafka 基本概念、通信过程等,介绍一下 Kafka 安装教程,最后介绍一下 Kafka 在项目中实际应用场景。

Kafka 介绍

Kafka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

相关概念

如下图所示,一个典型的 kafka 集群中包含若干 producer(可以是 web 前端产生的 page view,或者是服务器日志,系统 CPU、memory 等),若干 broker(Kafka 支持水平扩展,一般 broker 数量越多,集群吞吐率越高),若干 consumer group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 consumer group 发生变化时进行 rebalance。producer 使用 push 模式将消息发布到 broker,consumer 使用 pull 模式从 broker 订阅并消费消息。  

Kafka 相关概念

  • Producer: 生产者,消息的产生者,负责发布消息到 Kafka Broker。
  • Broker:经纪人,Kafka 集群包含一个或多个服务器,这种服务器被称为 Broker。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition;如果某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据;如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
  • Topic: 消息的主题,每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
  • Partition:topic 的分区,每个 topic 可以有多个分区,分区的作用是做负载,提高 kafka 的吞吐量。parition 是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时可指定 parition 数量。每个 partition 对应于一个文件夹,该文件夹下存储该 partition 的数据和索引文件。
  • Replication:复制,每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Consumer:消费者,即消息的消费方,是消息的出口。每个 consumer 属于一个特定的 consumer group。使用 consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group 内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。
  • Consumer Group:消费组,每个消费者都属于一个特定的 Consumer Group,可通过 group.id 配置项指定,若不指定 group name 则默认为 test-consumer-group。我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费,同一个消费者组的消费者可以消费同一个 topic 的不同分区的数据。
  • Zookeeper:kafka 集群依赖 Zookeeper 来保存集群的的元信息,来保证系统的可用性。Kafka 利用 Zookeeper 保存相应的元数据信息,包括:Broker 信息,Kafka 集群信息,旧版消费者信息以及消费偏移量信息,主题信息,分区状态信息,分区副本分片方案信息,动态配置信息,等等。

生产端设计

生产者发送消息流程:

  • 新建 ProducerRecord 对象,包含目标主题和要发送的内容,也可以指定键或分区
  • 如果配置了拦截器,可用对发送的消息进行可定制化的拦截或更改
  • 发送 ProducerRecord 对象时,生产者要把键和值对象序列化成字节数组,这样它们才能在网络上传输
  • 数据被传给分区器,如果 ProducerRecord 对象中指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回;如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区;选择好分区后,生产者就知道该往哪个主题和分区发送这条记录了。
  • 这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上,有一个独立的 Sender 线程负责把这些记录批次发送到相应的 broker 上
  • 服务器在收到这些消息时会返回一个相应,如果消息成功写入 kafka,就返回一个 RecordMetaData 对象,该对象包含了 Topic 信息、Patition 信息、消息在 Partition 中的 Offset 信息;如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息
  • Kafka 的顺序保证。Kafka 保证同一个 partition 中的消息是有序的,即如果生产者按照一定的顺序发送消息,broker 就会按照这个顺序把他们写入 partition,消费者也会按照相同的顺序读取他们

生产者发送消息流程

同步发送消息到 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Properties properties = new Properties() {{
/* 定义kakfa服务的地址 */
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/* 消息确认类型 */
put(ProducerConfig.ACKS_CONFIG, "all");
/* 生产端消息发送失败时的重试次数 */
put(ProducerConfig.RETRIES_CONFIG, 0);
/* batch 批次消息大小 */
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/* 用来控制 batch 最大的空闲时间 */
put(ProducerConfig.LINGER_MS_CONFIG, 1);
/* 生产端消息缓冲池或缓冲区的大小 */
put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/* 最大阻塞时间 */
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
/* key的序列化类 */
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/* value的序列化类 */
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
}};
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Topic Key Value
ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");
try {
Future future = producer.send(record);
future.get();
} catch (Exception e) {
// 连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
e.printStackTrace();
}

//all done close
producer.close();

注意:Producer 有两种错误类型。一种是可以通过再次发送消息解决的错误,比如连接出现问题,需要重新连接;或者是 “no leader” 错误,通过等待一会 Leader 重新选举完就可以继续。Producer 可以配置自动重试。另一种是通过重试无法处理的错误,比如消息过大,这种情况下,Producer 就不会重试,而是直接抛出异常。

异步发送消息到 Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Properties properties = new Properties() {{
/* 定义kakfa服务的地址 */
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/* 消息确认类型 */
put(ProducerConfig.ACKS_CONFIG, "all");
/* 生产端消息发送失败时的重试次数 */
put(ProducerConfig.RETRIES_CONFIG, 0);
/* batch 批次消息大小 */
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/* 用来控制 batch 最大的空闲时间 */
put(ProducerConfig.LINGER_MS_CONFIG, 1);
/* 生产端消息缓冲池或缓冲区的大小 */
put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/* 最大阻塞时间 */
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);
/* key的序列化类 */
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/* value的序列化类 */
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
}};
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Topic Key Value
ProducerRecord<String, String> record = new ProducerRecord<>("CustomCountry", "Precision Products", "France");
producer.send(record, (metadata, exception) -> {
// 如果Kafka返回一个错误,onCompletion方法抛出一个non null异常
if (exception != null) {
// 发送消息时,传递一个回调对象,该回调对象必须实现org.apahce.kafka.clients.producer.Callback接口
if (metadata != null) {
log.error("kafka write exception.topic - {}[{}]", metadata.topic(), metadata.partition(), exception);
} else {
log.error("kafka write exception.topic", exception);
}
}
});

//all done close
producer.close();

生产端核心参数

(1)acks:acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数消息丢失的可能性有重要影响。只有当 leader 确认已成功写入消息的副本数后,才会给 Producer 发送响应,此时消息才可以认为 “已提交”。该参数影响着消息的可靠性以及生产端 的 吞吐量,并且两者往往相向而驰,通常消息可靠性越高则生产端的吞吐量越低。

  • acks=0,表示生产端发送消息后立即返回,不等待 broker 端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
  • acks=1,表示 leader 副本成功写入就会响应 Producer,而无需等待 ISR[1] (同步副本)集合中的其他副本写入成功。这种方案提供了 适当的持久性,保证了一定的吞吐量。默认值即是 1。
  • acks=all 或 - 1,表示不仅要等 leader 副本成功 写入 ,还要求 ISR 中的其他副本成功写入,才会响应 Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。

调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为 all 或 - 1;如果允许存在丢失,建议设置为 1;一般不建议设为 0,除非无所谓消息丢不丢失。

(2)batch.size:发送到缓冲区中的消息会被分为一个一个的 batch,分批次的发送到 broker 端,这个参数就表示 batch 批次大小,默认值为 16384,即 16KB。因此减小 batch 大小有利于降低消息延时,增加 batch 大小有利于提升吞吐量。

调优建议:通常合理调大该参数值,能够显著提升生产端吞吐量,比如可以调整到 32KB,调大也意味着消息会有相对较大的延时。

(3)buffer.memory:表示生产端消息缓冲池或缓冲区的大小,默认值为 33554432,即 32M。这个参数基本可以认为是 Producer 程序所使用的内存大小。当前版本中,如果生产消息的速度过快导致 buffer 满了的时候,将阻塞 max.block.ms(默认 60000 即 60s)配置的时间,超时将会抛 TimeoutException 异常。在 Kafka 0.9.0 及之前版本,建 议设置另一个参数 block.on.buffer.full 为 true,该参数表示当 buffer 填满时 Producer 处于阻塞状态并停止接收新消息而不是抛异常。

调优建议:通常我们应尽量保证生产端整体吞吐量,建议适当调大该参数,也意味着生产客户端会占用更多的内存。也可以选择不调整 。

(4)compression.type:表示生产端是否对消息进行压缩,默认值为 none,即不压缩消息。压缩可以显著减少网络 IO 传输、磁盘 IO 以及磁盘空间,从而提升整体吞吐量,但也是以 牺牲 CPU 开销为代价的 。当前 Kafka 支持 4 种压缩方式,分别是 gzip、snappy、 lz4 及 zstd(Kafka 2.1.0 开始支持) 。

调优建议:出于提升吞吐量的考虑,建议在生产端对消息进行压缩。对于 Kafka 而已,综合考虑吞吐量与压缩比,建议选择 lz4 压缩。如果追求最高的压缩比则推荐 zstd 压缩 。

(5)max.request.size:这个参数比较重要,表示生产端能够发送的 最大 消息大小,默认值为 1048576,即 1M。

调优建议:一般而言,这个配置有点小,为了避免因消息过大导致发送失败,建议适当调大,比如调到 10485760 即 10M。

(6)retries:表示生产端消息发送失败时的重试次数,默认值为 0,表示不进行重试。这个参数一般是为了解决因瞬时故障导致的消息发送失败, 比如网络抖动、leader 换主,其中瞬时的 leader 重选举是比较常见的 。因此这个参数的设置显得非常重要。另外为了避免频繁重试的影响,两次重试之间都会停顿一段时间,受参数 retry.backoff.ms,默认为 100ms,通常可以不调整。

调优建议:这里要尽量避免消息丢失,建议设置为一个大于 0 的值,比如 3 或者更大值 。

(7)linger.ms:用来控制 batch 最大的空闲时间,超过该时间的 batch 也会被发送到 broker 端。这实际上是一种权衡,即吞吐量与延时之间的权衡。默认值为 0,表示消息需要被立即发送,无需关系 batch 是否被填满。

调优建议:通常为了减少请求次数、提升整体吞吐量,建议设置一个大于 0 的值,比如设置为 100,此时会在负载低的情况下带来 100ms 的延时 。

(8)request.timeout.ms:这个参数表示生产端发送请求后等待 broker 端响应的最长时间,默认值为 30000,即 30s,超时生产端可能会选择重试(如果配置了 retries)。

调优建议:该参数默认值一般够用了。如果生产端负载很大,可以适当调大以避免超时,比如可以调到 60000。

(9)max.in.flight.requests.per.connection:表示生产端与 broker 之间的每个连接最多缓存的请求数,默认值为 5,即每个连接最多可以缓存 5 个未响应的请求,该参数指定了生产者在收到服务器响应之前可以发送多少个消息。这个参数通常用来解决分区乱序的问题。

调优建议:为了避免消息乱序问题,建议将该参数设置为 1,表示生产端在某个 broker 响应之前将无法再向该 broker 发送消息请求,这能够有效避免同一分区下的消息乱序问题。

(10)interceptor.classes:用作拦截器的类的列表。通过实现 ProducerInterceptor 接口,您可以在生产者发布到 Kafka 集群之前拦截(并可能会改变)生产者收到的记录。默认情况下,没有拦截器,可自定义拦截器。

(11)partitioner.class:实现 Partitioner 接口的分区器类。默认使用 DefaultPartitioner 来进行分区。

消费端设计

消费者消费消息流程:

  • 消息由生产者发布到 kafka 集群后,会被消费者消费。消息的消费模型有两种,推送模型(push)和拉取模型(pull)。
  • kafka 采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序拉取每个分区的消息。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。
    这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。在一些消息系统中,消息代理会在消息被消费之后立即删除消息。如果有不同类型的消费者订阅同一个主题,消息代理可能需要冗余地存储同一消息;或者等所有消费者都消费完才删除,这就需要消息代理跟踪每个消费者的消费状态,这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟。Kafka 的做法是生产者发布的所有消息会一致保存在 Kafka 集群中,不管消息有没有被消费。用户可以通过设置保留时间来清理过期的数据,比如,设置保留策略为两天。那么,在消息发布之后,它可以被不同的消费者消费,在两天之后,过期的消息就会自动清理掉。
  • 消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
  • 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。
  • 再均衡期间,消费者无法读取消息,造成整个 consumer group 一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,当前的读取状态会丢失。消费者通过向作为组协调器(Group Coordinator)的 broker(不同的组可以有不同的协调器)发送心跳来维持和群组以及分区的关系。心跳表明消费者在读取分区里的消息。消费者会在轮询消息或提交偏移量(offset)时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为消费者已经死亡,会触发一次再均衡。(在 Kafka 0.10.1 的版本中,对心跳行为进行了修改,由一个独立的线程负责心跳)

自动确认 Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Properties properties = new Properties() {{
/* 定义kakfa服务的地址,不需要将所有broker指定上 */
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/* 制定consumer group */
put(ConsumerConfig.GROUP_ID_CONFIG, "test");
/* 开启自动确认选项 */
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
/* 自动提交时间间隔 */
put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
/* 消费者与服务器断开连接的最大时间 */
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
/* key的序列化类 */
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}};

/* 定义consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(Collections.singletonList("CustomCountry"));
/* subscribe() 也可以接收一个正则表达式,匹配多个主题:支持正则表达式,订阅所有与test相关的Topic */
//consumer.subscribe("test.*");
try {
/* 读取数据,读取超时时间为100ms */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
}
}
} finally {
// 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
consumer.close();
}

手工控制 Offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Properties properties = new Properties() {{
/* 定义kakfa服务的地址,不需要将所有broker指定上 */
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
/* 制定consumer group */
put(ConsumerConfig.GROUP_ID_CONFIG, "test");
/* 关闭自动确认选项 */
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/* 消费者与服务器断开连接的最大时间 */
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
/* key的序列化类 */
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
}};

/* 定义consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(Collections.singletonList("CustomCountry"));

try {
/* 读取数据,读取超时时间为100ms */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
}

try {
// 处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
consumer.commitSync();
} catch (CommitFailedException e) {
// 只要没有发生不可恢复的错误,commitSync方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里
log.error("commit failed", e);
}
}
} finally {
// 关闭消费者,网络连接和 socket 也会随之关闭,并立即触发一次再均衡
consumer.close();
}

注意:消费者为什么要提交偏移量?当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个 partition 最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

消费端核心参数

Kafka 与消费者相关的配置大部分参数都有合理的默认值,一般不需要修改,不过有一些参数与消费者的性能和可用性有很大关系。

(1)fetch.min.bytes:指定消费者从服务器获取记录的最小字节数。服务器在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes,那么会等到有足够的可用数据时才返回给消费者。

调优建议:合理的设置可以降低消费者和 broker 的工作负载,在 Topic 消息生产不活跃时,减少处理消息次数。如果没有很多可用数据,但消费者的 CPU 使用率却很高,需要调高该属性的值。如果消费者的数量比较多,调高该属性的值也可以降低 broker 的工作负载。

(2)fetch.max.wait.ms:指定在 broker 中的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取的数据量的也没有达到 fetch.min.bytes,最终导致 500ms 的延迟。

调优建议:如果要降低潜在的延迟(提高 SLA),可以调低该属性的值。fetch.max.wait.ms 和 fetch.min.bytes 有一个满足条件就会返回数据。

(3)max.parition.fetch.bytes:指定了服务器从每个分区里返回给消费者的最大字节数,默认值是 1MB。也就是说 KafkaConsumer#poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。

调优建议:如果一个主题有 20 个分区和 5 个消费者(同一个组内),那么每个消费者需要至少 4MB 的可用内存(每个消费者读取 4 个分区)来接收记录。如果组内有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.parition.fetch.bytes 必须比 broker 能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息,导致消费者一直重试。

(4)session.timeout.ms:指定了消费者与服务器断开连接的最大时间,默认是 3s。如果消费者没有在指定的时间内发送心跳给 GroupCoordinator,就被认为已经死亡,会触发再均衡,把它的分区分配给其他消费者。

调优建议:该属性与 heartbeat.interval.ms 紧密相关,heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 指定了消费者最长多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一,如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。

(5)auto.offset.reset:指定了消费者在读取一个没有偏移量(offset)的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理,默认值是 latest,表示在 offset 无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。

调优建议:另一个值是 earliest,消费者将从起始位置读取分区的记录。

(6)enable.auto.commit:指定了消费者是否自动提交偏移量,默认值是 true,自动提交。

调优建议:设为 false 可以程序自己控制何时提交偏移量。如果设为 true,需要通过配置 auto.commit.interval.ms 属性来控制提交的频率。

(7)partition.assignment.strategy:分区分配给组内消费者的策略,根据给定的消费者和 Topic,决定哪些分区应该被分配给哪个消费者。

调优建议:Kafka 有两个默认的分配策略。Range,把 Topic 的若干个连续的分区分配给消费者;RoundRobin,把所有分区逐个分配给消费者。默认值是 org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,org.apache.kafka.clients.consumer.RoundRobinAssignor 是 RoundRobin 策略的实现类。还可以使用自定义策略,属性值设为自定义类的名字。

(8)client.id:broker 用来标识从客户端发送过来的消息,可以是任意字符串,通常被用在日志、度量指标和配额中。

(9)max.poll.records:用于控制单次调用 call() 方法能够返回的记录数量,帮助控制在轮询里需要处理的数据量。

(10)receive.buffer.bytes:指定了 TCP socket 接收数据包的缓冲区大小。如果设为 - 1 就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

(11)send.buffer.bytes:指定了 TCP socket 发送数据包的缓冲区大小。如果设为 - 1 就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

Kafka 安装以及环境配置

本文统一使用软件包管理器的方式安装 Kafka,减少环境变量的配置,更加方便快捷。

Linux 安装 Kafka

Kafka 官网下载 Kafka 安装包,解压安装,或直接使用命令下载。Kafka 依赖 ZooKeeper,从 ZooKeeper 官网下载 ZooKeeper 安装包,解压安装,或直接使用命令下载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## 1. 下载 kafka
# 进入下载目录
$ cd /usr/local/src
# 注意,kafka_2.12-2.5.0.tgz 版本是已经编译好的版本,解压就能使用。
$ wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
# 解压
$ tar -xzvf kafka_2.12-2.5.0.tgz
# 移动到安装目录
$ mv kafka_2.12-2.5.0 /usr/local/kafka

## 2. 配置 kafka
# 进入配置目录
$ cd /usr/local/kafka/config
# 编辑修改相应的参数 [包括 broker.id、port、host.name、log.dirs、zookeeper.connect 等,小编这里统一设置默认]
$ vi server.properties
# 保存退出
$ wq!

## 3. 启动 kafka
# 进入 kafka 目录
$ cd /usr/local/kafka
# 启动 zookeeper, 启动 kafka 自带的 zookeeper(若不用自带 zk 可不执行此句)
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动 kafka
$ bin/kafka-server-start.sh -daemon config/server.properties

Kafka 是使用 Zookeeper 来保存集群元数据信息和消费者信息。虽然 Kafka 发行版已经自带了 Zookeeper,可以通过脚本直接启动,但仍然建议安装一个完整版的 Zookeeper。

Mac 安装 Kafka

Mac 中使用 brew 安装 Kafka 的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## 1. 使用 Kafka 安装,由于 Kafka 依赖了 Zookeeper,所以在下载的时候会自动下载。
$ brew install kafka

## 2. 配置 kafka
# 进入配置目录
$ cd /usr/local/etc/kafka/
# 编辑修改相应的参数 [包括 broker.id、port、host.name、log.dirs、zookeeper.connect 等,小编这里统一设置默认]
$ vi server.properties
# 保存退出
$ wq!

## 3. 启动 kafka
# 启动 zookeeper
$ brew services start zookeeper
# 启动 kafka
$ brew services start kafka

参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 当前机器在集群中的唯一标识,和 zookeeper 的 myid 性质一样
broker.id=0
# 当前 kafka 对外提供服务的端口默认是 9092
port=9092
# 这个参数默认是关闭的
host.name=192.168.1.170
# 这个是 borker 进行网络处理的线程数
num.network.threads=3
# 这个是 borker 进行 I/O 处理的线程数
num.io.threads=8
# 消息存放的目录,这个目录可以配置为 “,” 逗号分割的表达式,上面的 num.io.threads 要大于这个目录的个数这个目录,如果配置多个目录,新创建的 topic 他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/opt/kafka/kafkalogs/
# 发送缓冲区 buffer 大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
# kafka 接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
# 这个参数是向 kafka 请求消息或者向 kafka 发送消息的请请求的最大数,这个值不能超过 java 的堆栈大小
socket.request.max.bytes=104857600
# 默认的分区数,一个 topic 默认 1 个分区数
num.partitions=1
# 默认消息的最大持久化时间,168 小时,7 天
log.retention.hours=168
# 消息保存的最大值 5M
message.max.byte=5242880
# kafka 保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
default.replication.factor=2
# 取消息的最大直接数
replica.fetch.max.bytes=5242880
# 这个参数是:因为 kafka 的消息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新起一个文件
log.segment.bytes=1073741824
# 每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.retention.check.interval.ms=300000
# 是否启用 log 压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
# 设置 zookeeper 的连接端口、如果非集群配置一个地址即可
zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218

功能验证

1
2
3
4
5
6
7
8
9
10
11
# 创建 topic[创建一个名为 test 的 topic,只有一个副本,一个分区]
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 通过 list 命令查看刚刚创建的 topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181

# 使用 kafka-console-producer.sh 发送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

# 使用 kafka-console-consumer.sh 接收消息并在终端打印
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

友情提示:对于 Kafka 数据我们可以直接通过 IDEA 提供的 Kafka 可视化管理插件 - Kafkalytic 来查看。

Spring Boot 集成 Kafka

Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。

简单使用

1、配置 Pom 包,主要是添加 spring-kafka 的支持

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

2、配置 kafka 的安装地址、端口以及账户信息

1
spring.kafka.bootstrap-servers=localhost:9092

3、主题配置

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class KafkaConfig {

/**
* 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
* P.S.
* 1. 如果要修改分区数,只需修改配置值重启项目即可;修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
*/
@Bean
public NewTopic helloTopic() {
return new NewTopic("topic_hello", 2, (short) 1);
}
}

4、发送者

1
2
3
4
5
6
7
8
9
10
@Component
public class HelloProducer {

@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;

public void send() {
kafkaTemplate.send("topic_hello", "hello, world!");
}
}

5、接收者

1
2
3
4
5
6
7
8
@Component
public class HelloConsumer {

@KafkaListener(id = "hello", topics = "topic_hello")
public void process(ConsumerRecord<?, ?> record) {
System.out.println("Receiver :" + record.value());
}
}

6、 测试

1
2
3
4
5
6
7
8
9
10
11
@SpringBootTest
public class KafkaHelloTest {

@Autowired
private HelloProducer helloProducer;

@Test
public void hello() {
helloProducer.send();
}
}

高级使用

kafka 事务消息

默认情况下,Spring-kafka 自动生成的 KafkaTemplate 实例,是不具有事务消息发送能力的。如果需要开启事务机制,使用默认配置需要在 application.properties 添加 spring.kafka.producer.transaction-id-prefix 配置或者通过 Java Config 方式自己初始化 Bean。事务激活后,所有的消息发送只能在发生事务的方法内执行了,不然就会抛一个没有事务交易的异常。

1
2
// 通过 application.properties 添加 spring.kafka.producer.transaction-id-prefix 配置激活事务
spring.kafka.producer.transaction-id-prefix=kafka_tx.

Spring-Kafka 的事务消息是基于 Kafka 提供的事务消息功能的,Kafka 使用事务的两种方式:1、配置 Kafka 事务管理器并使用 @Transactional 注解;2、用 KafkaTemplate 的 executeInTransaction 方法

(一)配置 Kafka 事务管理器并使用 @Transactional 注解

使用注解方式开启事务还是比较方便的,不过首先需要我们配置 KafkaTransactionManager,这个类就是 Kafka 提供给我们的事务管理类,我们需要使用生产者工厂来创建这个事务管理类。通过 application.properties 添加 spring.kafka.producer.transaction-id-prefix 配置,KafkaAutoConfiguration 类会自动帮我们配置好相应的 Bean,感兴趣的同学可以阅读 KafkaAutoConfiguration 类的方法。

1
2
3
4
5
6
@Test
@Transactional(rollbackFor = RuntimeException.class)
public void send() {
kafkaTemplate.send("topic_input", "kl");
throw new RuntimeException("failed");
}

运行测试方法后我们可以看到控制台中输出了如下日志:

1
2
3
4
5
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:422) [kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) [kafka-clients-2.5.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) [kafka-clients-2.5.0.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]

(二)使用 KafkaTemplate 的 executeInTransaction 方法

1
2
3
4
5
6
7
8
@Test
public void testExecuteInTransaction() {
// 本地事务,不需要事务管理器
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic_input", "kl");
throw new RuntimeException("fail");
});
}

生产者获取发送结果

通过 KafkaTemplate 发送消息时,我们可以通过异步或者同步的方式获取发送结果:

(1)异步获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void send() {
kafkaTemplate.send("topic_input", "kl").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("kafka write topic failure.", throwable);
}

@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
log.info("kafka write topic successful. - {}[{}]", objectObjectSendResult.getRecordMetadata().topic(), objectObjectSendResult.getRecordMetadata().partition());
}
});
}

(2)同步获取

1
2
3
4
5
6
7
8
9
@Test
public void send() {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send("topic_input", "kl");
try {
SendResult<Object, Object> result = future.get();
} catch (Throwable e) {
e.printStackTrace();
}
}

消费者 @KafkaListener 的使用

前面在简单集成中已经演示过了 @KafkaListener 接收消息的能力,但是 @KafkaListener 的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:

(1)显示的指定消费哪些Topic和分区的消息
(2)设置每个Topic以及分区初始化的偏移量
(3)设置消费线程并发度
(4)设置消息异常处理器

1
2
3
4
5
6
7
8
@KafkaListener(id = "webGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "6", errorHandler = "myErrorHandler")
public String listen(String input) {
log.info("input value: {}", input);
return "successful";
}

errorHandler 需要设置这个参数需要实现一个接口 KafkaListenerErrorHandler。而且注解里的配置,是你自定义实现实例在 spring 上下文中的 Name。比如,上面配置为 errorHandler = “myErrorHandler”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
Logger logger =LoggerFactory.getLogger(getClass());
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
logger.info(message.getPayload().toString());
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
logger.info(message.getPayload().toString());
return null;
}
}

消费者 @KafkaListener 注解监听器生命周期

@KafkaListener 注解的监听器的生命周期是可以控制的,默认情况下,@KafkaListener 的参数 autoStartup = “true”。也就是自动启动消费,但是也可以同过 KafkaListenerEndpointRegistry 来干预他的生命周期。KafkaListenerEndpointRegistry 有三个动作方法分别如:启动 start()、停止 pause()、继续 resume();接下来我们通过一个场景来描述一下这个功能的用途:比如现在单机环境下,我们需要利用 Kafka 做数据持久化的功能,由于用户活跃的时间为早上 10 点至晚上 12 点,那在这个时间段做一个大数据量的持久化可能会影响数据库性能导致用户体验降低,我们可以选择在用户活跃度低的时间段去做持久化的操作,也就是晚上 12 点后到第二条的早上 10 点前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
@Component
@EnableScheduling
public class TaskListener {

private final KafkaProperties properties;

private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public TaskListener(KafkaProperties properties, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.properties = properties;
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> delayContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
// 禁止自动启动
factory.setAutoStartup(false);
return factory;
}


@KafkaListener(id = "durable", topics = "topic.quick.durable", containerFactory = "delayContainerFactory")
public void durableListener(String data) {
// 这里做数据持久化的操作
log.info("topic.quick.durable receive : " + data);
}

/**
* 定时器,每天凌晨0点开启监听
*/
@Scheduled(cron = "0 0 0 * * ?")
public void startListener() {
// 判断监听容器是否启动,未启动则将其启动
if (!kafkaListenerEndpointRegistry.getListenerContainer("durable").isRunning()) {
kafkaListenerEndpointRegistry.getListenerContainer("durable").start();
}
kafkaListenerEndpointRegistry.getListenerContainer("durable").resume();
}

/**
* 定时器,每天早上10点关闭监听
*/
@Scheduled(cron = "0 0 10 * * ?")
public void shutDownListener() {
kafkaListenerEndpointRegistry.getListenerContainer("durable").pause();
}
}

消费者手动 Ack 模式

手动 ACK 模式,由业务逻辑控制提交偏移量。开启手动首先需要关闭自动提交,然后设置下 consumer 的消费模式:

1
2
3
// enable.auto.commit 参数设置成 false。那么就是 Spring 来替为我们做人工提交,从而简化了人工提交的方式
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

上面的设置好后,在消费时,只需要在 @KafkaListener 监听方法的入参加入 Acknowledgment 即可,执行到 ack.acknowledge() 代表提交了偏移量:

1
2
3
4
5
@KafkaListener(id = "hello", topics = "topic_input")
public void listen(String input, Acknowledgment ack) {
log.info("input value: {}", input);
ack.acknowledge();
}

消费者 SendTo 消息转发

@SendTo 注解可以带一个参数,指定转发的 Topic 队列。常见的场景如,一个消息需要做多重加工,不同的加工耗费的 cup 等资源不一致,那么就可以通过跨不同 Topic 和部署在不同主机上的 consumer 来解决了。

1
2
3
4
5
6
7
8
9
10
11
12
@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo("topic-ckl")
public String listen(String input) {
log.info("input value: {}", input);
// 将 input + "hello!" 消息转发至 topic-ckl 队列
return input + "hello!";
}

@KafkaListener(id = "webGroup1", topics = "topic-ckl")
public void listen2(String input) {
log.info("input value: {}", input);
}

kafka 知识问答

Kafka 的分区数是不是越多越好?

kafka 使用分区将 topic 的消息打散到多个分区分布保存在不同的 broker 上,实现了 producer 和 consumer 消息处理的高吞吐量。Kafka 的 producer 和 consumer 都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优 Kafka 并行度的最小单元。对于 producer 而言,它实际上是用多个线程并发地向不同分区所在的 broker 发起 Socket 连接同时给这些分区发送消息;而 consumer,同一个消费组内的所有 consumer 线程都被指定 topic 的某一个分区进行消费。所以说,如果一个 topic 分区越多,理论上整个集群所能达到的吞吐量就越大。

分区多的缺点:

一、客户端 / 服务器端内存开销
Kafka0.8.2 之后,在客户端 producer 有个参数 batch.size,默认是 16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。对于服务器端 consumer 的开销也不小,如果阅读 Kafka 源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如 controller,FetcherManager 等,因此分区数越多,这种缓存的成本就越大。

二、文件句柄的开销
每个分区在底层文件系统都有属于自己的一个目录。该目录下通常会有两个文件: base_offset.log 和 base_offset.index。Kafak 的 Controller 和 ReplicaManager 会为每个 broker 都保存这两个文件句柄 (file handler)。很明显,如果分区数越多,所需要保持打开状态的文件句柄数也就越多,最终可能会突破你的 ulimit -n 的限制。

三、降低高可用性
Kafka 通过副本 (replica) 机制来保证高可用。具体做法就是为每个分区保存若干个副本 (replica_factor 指定副本数)。每个副本保存在不同的 broker 上。其中的一个副本充当 leader 副本,负责处理 producer 和 consumer 请求。其他副本充当 follower 角色,由 Kafka controller 负责保证与 leader 的同步。如果 leader 所在的 broker 挂掉了,contorller 会检测到然后在 zookeeper 的帮助下重选出新的 leader —— 这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有 10000 个分区,10 个 broker,也就是说平均每个 broker 上有 1000 个分区。此时这个 broker 挂掉了,那么 zookeeper 和 controller 需要立即对这 1000 个分区进行 leader 选举。比起很少的分区 leader 选举而言,这必然要花更长的时间,并且通常不是线性累加的。

一条消息如何知道要被发送到哪个分区?

一、按照 key 值分配:默认情况下,Kafka 根据传递消息的 key 来进行分区的分配,即 hash(key) % numPartitions,这保证了相同 key 的消息一定会被路由到相同的分区。

二、key 为 null 时,从缓存中取分区 id 或者随机取一个:不指定 key 时,Kafka 几乎就是随机找一个分区发送无 key 的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka 本身也会清空该缓存(默认每 10 分钟或每次请求 topic 元数据时)。

Kafka 是如何保证数据可靠性?

一、Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为 3。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

二、Producer 往 Broker 发送消息

Kafka 在 Producer 里面提供了消息确认机制,也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

三、Leader 选举

每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,只有 ISR 里的成员才有被选为 leader 的可能。所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

  • producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
  • topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
  • broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;

Kafka 是如何保证数据一致性?

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

假设分区的副本为 3,其中副本 0 是 Leader,副本 1 和副本 2 是 follower,并且在 ISR 列表里面。虽然副本 0 已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本 2,这个很类似于木桶原理。这样做的原因是还没有被足够多副本复制的消息被认为是 “不安全” 的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本 0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本 1 为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

Kafka High Water Mark

Kafka 如何通过经典的内存缓冲池设计来优化 JVM GC 问题?

Kafka 是一个高吞吐的消息队列,是大数据场景首选的消息队列,这种场景就意味着发送单位时间消息的量会特别的大,那么 Kafka 如何做到能支持能同时发送大量消息的呢?

Kafka 通过批量压缩和发送做到能支持能同时发送大量消息。Kafka 的 kafkaProducer 对象是线程安全的,每个发送线程在发送消息时候共用一个 kafkaProducer 对象来调用发送方法,最后发送的数据根据 Topic 和分区的不同被组装进某一个 RecordBatch 中。发送的数据放入 RecordBatch 后会被发送线程批量取出组装成 ProduceRequest 对象发送给 Kafka 服务端。

Kafka 通过使用内存缓冲池的设计,让整个发送过程中的存储空间循环利用,有效减少 JVM GC 造成的影响,从而提高发送性能,提升吞吐量。也就是说,Kafka 首先判断需要存储的数据的大小是否 free(已申请未使用空间)里有合适的 recordBatch 装得下,如果装得下则用 recordBatch 来存储数据;如果 free(已申请未使用空间)里没有空间但是 availableMemory(未申请未使用)+free(已申请未使用空间)的大小比需要存储的数据大(也就是说可使用空间比实际需要申请的空间大),说明可使用空间大小足够,则会用让 free 一直释放 byteBuffer 空间直到有空间装得下要存储的数据位置;如果需要申请的空间比实际可使用空间大,则内存申请会阻塞直到申请到足够的内存为止。


参考博文

[1]. kafka生产者Producer参数设置及参数调优建议-kafka商业环境实战系列
[2]. spring-kafka生产者消费者配置详解
[3]. Kafka 安装及快速入门
[4]. spring boot集成kafka之spring-kafka深入探秘
[5]. Spring Boot Kafka概览、配置及优雅地实现发布订阅
[6]. Kafka 是如何保证数据可靠性和一致性


注脚

[1]. ISR:ISR((In Sync Replicas)所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR (In Sync Replicas)。分区中的所有副本统称为 AR (Assigned Replicas)。ISR 集合是 AR 集合的一个子集。消息会先发送到 leader 副本,然后 follower 副本才能从 leader 中拉取消息进行同步。同步期间,follow 副本相对于 leader 副本而言会有一定程度的滞后。前面所说的 ”一定程度同步 “ 是指可忍受的滞后范围,这个范围可以通过参数进行配置。于 leader 副本同步滞后过多的副本(不包括 leader 副本)将组成 OSR (Out-of-Sync Replied)由此可见,AR = ISR + OSR。正常情况下,所有的 follower 副本都应该与 leader 副本保持 一定程度的同步,即 AR=ISR,OSR 集合为空。leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中所有 follower 副本“追上” 了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的 follower 副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会。


了不起的消息队列系列


谢谢你长得那么好看,还打赏我!😘
0%