Kafka消费者组

Kafka消费者组

Scroll Down

fate1.jpeg

什么是消费者组

Consumer Group是kafka提供的可扩展且具有容错性的消费者机制。组需要用唯一的id标识,这个id被称为group id。组内可以有多个消费者实例。

消费者组特性

  1. Consumer Group可以有一个或多个消费者实例。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  3. topic的单个分区只能由同一个消费者组内的一个消费者消费,一个消费者可以消费topic的多个分区。这个分区当然也可以被其他的 Group 消费。

消息引擎模型

  • 点对点模型 对Kafka来说,如果所有的Consumer 都属于一个Group ,那么就是点对点模型。

  • 发布/订阅模型 如果所有的consumer都属于不同的Group ,那么就是发布/订阅模型。

组下有多少个Consumer 合适?

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数

实际使用中,可以设置Consumer实例数量小于Group订阅主题的分区数,但不建议设置Consumer实例数量大于Group订阅主题的分区数,这会造成多余的Consumer的资源浪费。

消费者位移

Consumer Offset是消费者对主题分区的消费位置的记录。对于新老版本的消费者的消费者组,他们的管理方式是不一样的。老版本的消费者,Consumer Offset是保存在zookeeper中的,需要知道的是,zookeeper这类框架不适合进行频繁的读写更新。Consumer Offset却是需要频繁更新的操作,所以大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能。新版的消费者是把位移保存在Kafka内部的主题中,这个主题也叫位移主题。

位移主题

位移主题在Kafka中成为为__consumer_offsets。新版本 Consumer 的位移管理机制,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets中,也就是__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息

位移主题的 Key 中应该保存 3 部分内容:<GroupId, 主题名,分区号>。因为同一个group下,不可能存在一个分区被两个消费者消费,所以这样设计可以满足了。

位移主题什么时候创建的

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题,它的分区数由Broker 端参数 offsets.topic.num.partitions决定,默认值为50,在Kafka的日志路径下,会看到有很多__consumer_offsets-xxx这个的日志目录出现。它的副本因子由Broker 端参数 offsets.topic.replication.factor决定。它的默认值是 3。

Consumer位移

Consumer需要向Kafka汇报自己的位移数据,这个汇报是针对分区级别来进行的,要向各个分区汇报自己的位移数据,记录的是要消费的下一条消息的位移。

Consumer提交位移的方式

Consumer 提交位移的方式有两种:自动提交位移手动提交位移。Consumer端有个enable.auto.commit参数,true为自动提交,false为手动提交。不建议使用自动提交。

如果是自动提交,Consumer端还有个参数为auto.commit.interval.ms,控制的是自动提交的频率,默认是5s

如果是手动提交,那么又分两种,分别为同步提交(commitSync)和异步提交(commitAync)。

自动提交的优缺点:

Consumer的poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,所以这种方式下不会有消费丢失的问题,但是会有重复消息的问题。假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。

缺点是如果是同步提交,消息者会出现阻塞,知道位移提交成功。

手动提交优缺点

更加灵活,完全能够把控位移提交的时机和频率。 如果是异步提交,不会阻塞,缺点是提交失败就不会自动重试了。

最佳实践是把commitSync 和 commitAsync 组合使用。利用commitSync 的自动重试来规避那些瞬时错误,比如网络错误、Broker端GC。利用commitAsync来避免阻塞。


try {
     while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步提交规避阻塞
     }
} catch(Exception e) {
     handle(e); // 处理异常
} finally {
     try {
         consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  } finally {
       consumer.close();
  }
}

手动提交还支持批量提交,毕竟如果一下子提交大量的位移,是有风险,一旦提交出错,会有重复消费问题。把大事务化解成小事务是比较好的解决方案,也就是说一部分一部分地提交,把风险降到最小。使用commitSync(Map<TopicPartition,Offset>)和commitAsync(Map<TopicPartition,OffsetAndMetadata>)

位移主题数据怎么删除

如果你选择的是自动提交位移,那么就可能存在一个问题:只要 Consumer 一直启动着,它就会无限期地向位移主题写入消息。如果不删除的话,会把硬盘塞满,Kafka提供了一个Compact 策略来删除位移主题中的过期消息。

那么应该如何定义 Compact 策略中的过期呢?对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。

挺好理解的,说白了就是以前位移到10,现在位移到了20,那么10就是可以被删的消息,在这里贴一张来自官网的图片,来说明 Compact 过程。

image.png

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。如果生产环境出现主题位移消息过多,导致磁盘塞满的问题,那就要去看看这个log Cleaner线程是不是挂了。

参考

  • 极客时间-《Kafka核心技术与实践》