什么是消费者组
Consumer Group是kafka提供的可扩展且具有容错性的消费者机制。组需要用唯一的id标识,这个id被称为group id。组内可以有多个消费者实例。
消费者组特性
- Consumer Group可以有一个或多个消费者实例。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- topic的单个分区只能由同一个消费者组内的一个消费者消费,一个消费者可以消费topic的多个分区。这个分区当然也可以被其他的 Group 消费。
消息引擎模型
-
点对点模型 对Kafka来说,如果所有的Consumer 都属于一个Group ,那么就是点对点模型。
-
发布/订阅模型 如果所有的consumer都属于不同的Group ,那么就是发布/订阅模型。
组下有多少个Consumer 合适?
理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
实际使用中,可以设置Consumer实例数量小于Group订阅主题的分区数,但不建议设置Consumer实例数量大于Group订阅主题的分区数,这会造成多余的Consumer的资源浪费。
消费者位移
Consumer Offset是消费者对主题分区的消费位置的记录。对于新老版本的消费者的消费者组,他们的管理方式是不一样的。老版本的消费者,Consumer Offset是保存在zookeeper中的,需要知道的是,zookeeper这类框架不适合进行频繁的读写更新。Consumer Offset却是需要频繁更新的操作,所以大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能。新版的消费者是把位移保存在Kafka内部的主题中,这个主题也叫位移主题。
位移主题
位移主题在Kafka中成为为__consumer_offsets。新版本 Consumer 的位移管理机制,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets中,也就是__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。
位移主题的 Key 中应该保存 3 部分内容:<GroupId, 主题名,分区号>。因为同一个group下,不可能存在一个分区被两个消费者消费,所以这样设计可以满足了。
位移主题什么时候创建的
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题,它的分区数由Broker 端参数 offsets.topic.num.partitions决定,默认值为50,在Kafka的日志路径下,会看到有很多__consumer_offsets-xxx这个的日志目录出现。它的副本因子由Broker 端参数 offsets.topic.replication.factor决定。它的默认值是 3。
Consumer位移
Consumer需要向Kafka汇报自己的位移数据,这个汇报是针对分区级别来进行的,要向各个分区汇报自己的位移数据,记录的是要消费的下一条消息的位移。
Consumer提交位移的方式
Consumer 提交位移的方式有两种:自动提交位移和手动提交位移。Consumer端有个enable.auto.commit参数,true为自动提交,false为手动提交。不建议使用自动提交。
如果是自动提交,Consumer端还有个参数为auto.commit.interval.ms,控制的是自动提交的频率,默认是5s。
如果是手动提交,那么又分两种,分别为同步提交(commitSync)和异步提交(commitAync)。
自动提交的优缺点:
Consumer的poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,所以这种方式下不会有消费丢失的问题,但是会有重复消息的问题。假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
缺点是如果是同步提交,消息者会出现阻塞,知道位移提交成功。
手动提交优缺点
更加灵活,完全能够把控位移提交的时机和频率。 如果是异步提交,不会阻塞,缺点是提交失败就不会自动重试了。
最佳实践是把commitSync 和 commitAsync 组合使用。利用commitSync 的自动重试来规避那些瞬时错误,比如网络错误、Broker端GC。利用commitAsync来避免阻塞。
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
手动提交还支持批量提交,毕竟如果一下子提交大量的位移,是有风险,一旦提交出错,会有重复消费问题。把大事务化解成小事务是比较好的解决方案,也就是说一部分一部分地提交,把风险降到最小。使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)
位移主题数据怎么删除
如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。如果不删除的话,会把硬盘塞满,Kafka提供了一个Compact 策略来删除位移主题中的过期消息。
那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
挺好理解的,说白了就是以前位移到10,现在位移到了20,那么10就是可以被删的消息,在这里贴一张来自官网的图片,来说明 Compact 过程。
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。如果生产环境出现主题位移消息过多,导致磁盘塞满的问题,那就要去看看这个log Cleaner线程是不是挂了。
参考
- 极客时间-《Kafka核心技术与实践》