Kafka无消息丢失配置

Kafka无消息丢失配置

Scroll Down

saber.png

Kafka中如何防止消息丢失是很重要的内容,Kafka在什么情况下才能保证消息不丢失呢?

概况起来就是,Kafka只对已提交的消息做有限度的持久化保证。

如果消息都没能发送到broker端,Kafka也没办法。有限度是指集群中至少要有一台broker可用,如果机器全都挂了,那就没办法保证了。

“消息丢失”案例

案例一:生产者端丢失数据

比如发生了网络抖动,或者消息本身不合格,Broker拒收了等。这时候,都是需要producer来处理这些问题了。这里有个好建议是

Producer端永远要使用带有回调通知的API,不要使用producer的send(msg),而要使用producer.send(msg, callback);在springboot中的写法为

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("alarm", gson.toJson(message));
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
   @Override
   public void onFailure(Throwable throwable) {
                
   }

   @Override
   public void onSuccess(SendResult<String, String> stringStringSendResult) {

   }
});

这里的回调可以获取失败的异常信息,根据异常,调整数据。

案例 2:消费者程序丢失数据

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。要理解这段话,首先要明白,消费者有个“消费者位移”的概念。消费者移位表示的是这个 Consumer 当前消费到的 Topic 分区的位置。

image.png

比如上面这个图,就是消费者A消费到了位移9这个位置,消费者B消费到了11这个位置。

消费者出现消息消丢失往往是这种情况,先提交了位移,然后才去消费,这是一种非常错误的做法。

还有一种情况是,如果在消费者开启多线程来异步处理消息,这也得注意一个问题就是,不要用自动提交位移,因为假如某个线程执行失败,那其实这条消息没有被消费成功,对消费者来说,也就是丢失了。

最佳实践

  1. 不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法
  2. 设置acks=all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算给是“已提交”。这是最高等级的“已提交”定义。
  3. 设置retries为一个较大的值。这里的retries同样是Producer的参数,对应签名提到的Producer自动重试。当出现网络的瞬时抖动时候,消息发送可能会失败,此时配置了retires > 0的Producer能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置replication.factor >= 3。这也是Broker端的参数。这里是为了冗余,保存多几份。
  6. 设置min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。