Kafka幂等和事务

Kafka幂等和事务

Scroll Down

义勇a

在理解Kafka的幂等性前,我们需要了解Kafka提供哪几种消息交付可靠性保障。 有以下三种,分别为:

  1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前Kafka默认是提供at least once可靠性保障,如果消息没有发送成功,producer端会进行重试,从而保证了至少一次,但是这种情况下,可能会有重复消费的问题。

如果我们把producer的发送重试禁用掉,那么就是at most once可靠性保障。

以上三种可靠性保障,最诱人的是哪种呢?很明显是exactly once。精确一致性保障就是要依赖Kafka提供的幂等性功能。

幂等

什么是幂等性?

是指某些函数,或者操作不管进行几次,最终结果都不会变。(纯属个人大白话理解)。

如何启用幂等?

幂等是0.11.0.0 版本引入的新功能。要想使用该功能,需要在producer端的配置中把参数enable.idempotence设置为true。 enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer。

幂等实现的原理

为了实现幂等性,Kafka引入了producer Idsequence Number

  1. producer id:每一个Producer在初始化时,会请求Broker获取一个唯一的producer id。
  2. sequence Number:对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。

Producer在创建实例的时候,会在后台同时启动一个Sender线程并且运行。 在启用幂等性的情况下,我们可以进入找到Sender的run方法

void run(long now) {
        if (this.transactionManager != null) {
            try {
		//squenceNumber不符合规矩,会重置pid
                if (this.transactionManager.shouldResetProducerStateAfterResolvingSequences()) {
                    this.transactionManager.resetProducerId();
                }
                if (!this.transactionManager.isTransactional()) {
                    this.maybeWaitForProducerId();
                }
                .....
            } catch (AuthenticationException var5) {
                this.log.trace("Authentication exception while processing transactional request: {}", var5);
                this.transactionManager.authenticationFailed(var5);
            }
        }

        long pollTimeout = this.sendProducerData(now);
        this.client.poll(pollTimeout, now);
}

其中maybeWaitForProducerId方法就是阻塞获取PID。

对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增Sequence Number

Producer发送的消息,Broker端为每个<PID, Topic, Partition维护一个序号,并且每次Commit一条消息时将其对应序号递增

  • 如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出OUT_OF_ORDER_SEQUENCE_NUMBER 错误。
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DUPLICATE_SEQUENCE_NUMBER。

幂等缺点

它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。重启了 Producer 进程之后,这种幂等性保证就丧失。这里也好理解,重启后,pid变了,自然不然保证了。

事务

Kafka 自 0.11 版本开始也提供了对事务的支持,目前主要是在 read committed 隔离级别上做事情。它能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息.

事务型producer

事务型producer能够保证消息原子性地写入到多个分区中,这批消息要么全部写入成功,要么全部写入失败。事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。

如何开启事务

只需要在producer端指定两个参数

  1. enable.idempotence = true
  2. transactional.id=你的事务id,这个指建议设置成一个有意义的值。

consumer要想配合事务型producer来使用,需要修改isolation.level参数:

  1. read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值
  2. read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。

有了Transaction ID后,Kafka可保证:

  • 跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。
  • 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。

事务原理

  1. Finding a transaction coordinator

因为transaction coordinator是生产者获取pid和开启事务的核心,所以生产者第一步是要发送一个请求给任一的broker,去到自己的transaction coordinator

  1. Getting a producer Id

接下来producer要通过发生InitPidRequest请求给transaction coordinator来获取自己的pid

如果producer端设置了transaction.id,那么这个transaction.id的值会随着 InitPidRequest传给transaction coordinator,transaction coordinator会以<transactionId, pid>形式以日志方式记录下来。

除了返回pid,InitPidRequest还执行了以下动作:

  • 增加pid对应的epoch。这个目的是防止旧的相同pid的producer,执行他的事务。
  • 恢复或者放弃旧的pid没有完成的事务
  1. Start transaction producer调用beginTransaction()方法,代表在本地开启事务,记录了事务的启动状态,transaction coordinator还不知道这个事务,直到发送了第一条消息。

  2. Consume-transform-produce loop 这个阶段,有整个事务的数据处理过程,并且包含了多种请求。

4.1 AddPartitionsToTxnRequest

producer第一次在transaction中给主题的分区发送消息时,会给transaction coordinator 发送这个AddPartitionsToTxnRequest请求,同时这个TopicPartition 会被记录在transaction log的日志中。这是为了以后可以给每个TopicPartition 标记提交或者放弃状态。如果这个事务是第一次往分区写数据,会开启一个定时器,过期事务会失效。

4.2 ProduceRequest

message会以ProduceRequest形式发送,其中包括PID, epoch, and sequence number

4.3 AddOffsetCommitsToTxnRequest producer通过sendOffsetsToTransaction来实现消息的批量消费生产同一个事务内。

4.4 TxnOffsetCommitRequest

作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets,真正提交之前,该偏移量对外部不可见

  1. Committing or Aborting a Transaction

在对数据进行操作后,producer必须调用commit或者abort来提交或者放弃事务。

5.1 EndTxnRequest

不管producer是执行KafkaProducer.endTransaction还是KafkaProducer.abortTransaction,producer都会向transaction coordinator执行EndTxnRequest请求。

在接收到这个请求后,transaction coordinator会执行以下操作:

  1. 写一个PREPARE_COMMIT或者PREPARE_ABORT 消息到transaction log中
  2. 通过WriteTxnMarkerRequest请求以Transaction Marker的形式将COMMITABORT信息写入用户数据日志以及Offset Log中。
  3. 最后将COMPLETE_COMMITCOMPLETE_ABORT消息写入Transaction Log中。

5.2 WriteTxnMarkerRequest 这个请求由transaction coordinator,给事务内的全部topicPartion的leader所在的broker服务器,发起请求。 每一个broker服务器都会写入commit或者abort的controll message进入日志中。这个controller message向消费者表明了有pid的消息必须被消费或者丢弃,消息者会缓存这些具有pid的消息,直到读取到相应的commit或者abort消息,读到之后,要么放弃消息,要么提交掉。

5.3 Writing the final Commit or Abort Message 在所有的commit或者abort标记写入到数据日志后,transaction coordinator 会写一个最终的commit或者abort消息到transaction log中,表明事务已经完成。这时候,可以删除事务日志中与该事务有关的大多数消息,只需要保留完成事务的pid和时间戳,transaction->producer的映射可以删掉了。

事务的整体流程示意图如下:

Kafka Transactions Data Flow.png

参考文章