Kafka拦截器

Kafka拦截器

Scroll Down

米豆子a

今天我们来看看Kafka拦截器的使用,和在spring中拦截器被广泛使用不同,Kafka的拦截器自 0.10.0.0 版本被引入后并未得到太多的实际应用,但是我感觉这功能应该是个潜力股,所以今天我们来看看这个功能的使用吧。

什么是拦截器?

拦截器的基本思想是在不改变业务逻辑的情况下,动态地实现一组可拔插的事件逻辑处理链。

Kafka中也是借鉴这种思想,方便我们在消息处理的前后多个时间点植入不同的处理逻辑,比如消息在发送前,或者消息被消费后。

Kafka拦截器分类

  • 生产者拦截器

生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑。

要想使用生产者拦截器,需要指定生产者端的参数interceptor.classes。多个值用逗号分隔即可。比如springboot中,可以这样配置

spring.kafka.producer.properties.interceptor.classes=com.example.springbootkafka.interceptor.AvgLatencyProducerInterceptor

我们自定义的生产者拦截器需要实现ProducerInterceptor接口,接口有两个方法我们需要关注一下

    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        //写入我们的处理逻辑 
        return producerRecord;
    }
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }
  1. onSend:该方法会在消息发送之前被调用。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。

onAcknowledgement这个方法会先于callback调用,onSend和onAcknowLedement是分别运行在不同线程,两者如果共享变量的话,要进行线程同步。

  • 消费者拦截器

消费者拦截器需要实现ConsumerInterceptor接口,其中也有两个核心方法:

  1. onConsume:该方法在消息返回给 Consumer 程序之前调用。
  2. onCommit:Consumer 在提交位移之后调用该方法。

以上的拦截器都不要插入的过重的业务逻辑,不然会影响消息的消息发送的TPS和消息消费的TPS。

使用场景

Kafka 拦截器可以应用于包括端到端系统性能检测、消息审计等多种功能在内的场景。

下面以端到端系统性能监测举例,比如我想看消息从产生到消费的延迟时间是多少。

既然先看延迟时间,那么有两个关键点需要统计,产生消息的总数量totalMessageCount和到消费需要花费的时间totalLatency,我们用时间除以总数就可以得到每条消息的平均延迟时间。

生产者拦截器代码


public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


    private Jedis jedis; // 省略Jedis初始化


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalMessageCount");
        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }
}

消费者拦截器代码


public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalMessageCount= Long.parseLong(jedis.get("totalMessageCount"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalMessageCount));
        return records;
    }


    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs) {
    }
}