生产者消息分区机制原理解析

生产者消息分区机制原理解析

Scroll Down

善逸2

在Kafka中,一个topic会有多个分区,一个topic下的消息只会在其中的一个分区。那么问题来了,为什么需要分区呢?

答案:提供负载均衡的能力。不同的分区可以分布在不同的机器节点上,每台机器可以独立各种分区的消息的读写请求。

Kafka的分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供了默认的分区策略,同时也支持我们自定义分区策略。

要想自定义分区策略,可以指定producer的partitioner.class参数,在springboot中为

#自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.example.springbootkafka.config.MyPartitioner

其中partitioner.class参数的值为你自定义的分区策略类,这个类需要实现 Partitioner 接口就可以了。重载一下下面这个关键的接口方法就好

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return 1;
}

分区策略种类总结有以下几类

  • 轮询策略

轮询策略 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一

  • 按键消息保存策略

Kafka允许为每条消息定义一个消息键,称为key。key可以结合业务使用,赋予业务意义,比如部门编码,人员id,设备编码等。在Kafka的默认策略中,会保证同一个key进入相同的分区中,同一个分区下的消息可以保证顺序

保证消息的有序性,可以有两种方法:

  1. 单分区,单分区的消息毫无疑问是有序,但是弊端也明显,失去了分区的特性
  2. 使用上面说的按key分区,这样可以保证同一个key的消息会进入到同一个分区中。

实际的生产使用中,我们也要多观察自己的业务特点,不要为了保证消息有序性,盲目使用单分区。可以多看看,这些消息的有没有一个共同的key。

源码

Kafka2.0中的默认分区策略为DefaultPartitioner类,它的partition方法如下

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
	//获取主题的全部分区
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
	//如果没有指定消息的key
        if (keyBytes == null) {
	    //使用的轮询策略
            int nextValue = this.nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
		//消息随机分布到topic可用的partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
		//没有可用的分区,随便给个不可用的分区
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
	    //指定了key,那么就是用hash算法计算出hash值,进行分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
}