Kafka 生产者(一)KafkaProducer

2020年4月20日 | 作者 Siran | 5800字 | 阅读大约需要12分钟
归档于 消息队列 | 标签 #kafka

1. 概述

以Kafka Producer doc 中的例子作为开场( 基于Kafka-2.5.0 ),Producer 用于发消息到 Kafka Broker ,代码如下:

public void main(String[] args){
  //① 配置
  Properties props = new Properties();
  props.put("bootstrap.servers", "localhost:9092"); 
  props.put("acks", "all"); 
  props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  //② 创建KafkaProducer 实例
  Producer<String, String> producer = new KafkaProducer<>(props);
  for (int i = 0; i < 100; i++)
    //③ 调用 send 方法发送消息到 Broker
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
  producer.close();
}
  • Kafka 的配置
    • bootstrap.servers :kafka server的 Ip + Port
    • acks :发送消息到Kafka server 后,server 根据哪种策略返回确认收到消息。有以下三种类型:
      • -1:意味着需要等待ISR集合中所有的副本都确认收到消息之后才能正确收到响应结果,或者捕获超时异常(all 就是 -1 的含义)
      • 0:不需要server端的确认,直接返回
      • 1:只要 leader 接受到了就返回
    • key.serializer:key的序列化方式
    • value.serializer:value的序列化方式
  • 创建 KafkaProducer 实例
  • 调用 send 方法发送消息到 Broker

1.1 Kafka Producer 整体流程图:

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender线程(发送线程)

  • 主线程
    • ProducerInterceptors : 对消息进行拦截,可以自定义只要实现 ProducerInterceptor 接口就行,可以有多个实现拦截链。
    • Serializer : 对消息的 key 和 value 进行序列化,可以自定义实现 Serializer 接口即可
    • Partitioner 为消息进行分区,可以自定义实现 Partitioner 接口即可
    • RecordAccumlator 收集消息,实现批量发送,进而减少网络传输的资源消耗以提升性能。
  • Sender 线程
    • Sender 线程从RecordAccumlator 中获取消息
    • ⑥ 构造 ClietnRequest
    • ⑦ 将ClientRequest交给 NetWorkClient 准备发送
    • ⑧ 将请求放入 KafkaChannel
    • ⑨ 执行 IO 操作,发送请求
    • ⑩ 得到 响应 ,调用 ClientRequest 的回调函数
    • ⑪ 调用 RecordBatch 的回调函数,进行清理操作

下面根据图中的流程看看源码实现的细节:


2. 源码解析

2.1 KafkaProducer

public class KafkaProducer<K, V> implements Producer<K, V> {
    //① 生产者 id
    private final String clientId;
    //② Visible for testing
    final Metrics metrics;
    //③ 分区,根据具体的分区策略,分配消息
    private final Partitioner partitioner;
    //④ 生产者发送消息最大值,默认 1MB ,可以通过 max.request.size 进行配置
    private final int maxRequestSize;
    //⑤ 生产者用于缓存消息的缓冲区大小,默认32MB, 可以通过buffer.memory 进行配置
    private final long totalMemorySize;
    //⑥ 每个Producer 实例的元数据
    private final ProducerMetadata metadata;
    //⑦ 消息寄存器,用于收集并缓存消息
    private final RecordAccumulator accumulator;
    //⑧ sender ,用于发送消息,实现runnable 接口,在ioThread 线程种执行
    private final Sender sender;
    //⑨ 执行Sender 任务发送消息,Sender 线程
    private final Thread ioThread;
    //⑩ 消息的压缩方式,默认为none,可以配置为gzip、snappy、lz4,zstd,可以通过compression.type进行配置
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;
    //⑪ key 的序列化器
    private final Serializer<K> keySerializer;
    //value 的序列化器
    private final Serializer<V> valueSerializer;
    //⑫ 生产者配置
    private final ProducerConfig producerConfig;
    //⑬ 通过判断发送消息(send) 或者 分区 最长等待时间 可以通过 max.block.ms 配置
    private final long maxBlockTimeMs;
    //⑭ 拦截器
    private final ProducerInterceptors<K, V> interceptors;
    //⑮ 发送请求需要带的version,server端可以通过请求来判断是否过期请求等
    private final ApiVersions apiVersions;
    //⑯ 事务
    private final TransactionManager transactionManager;
}
  • clientId : 生产者Id
  • metrics :指标
  • partitioner :分区,根据具体的分区策略,分配消息
  • maxRequestSize :生产者发送消息最大值,默认 1MB ,可以通过 max.request.size 进行配置
  • totalMemorySize:生产者用于缓存消息的缓冲区大小,默认32MB, 可以通过buffer.memory 进行配置
  • metadata:producer 的元数据
  • accumulator:寄存器,用于收集并缓存消息,默认16kb,大小可以通过 batch.size 进行配置
  • sender:用于发送消息,实现runnable 接口,在ioThread 线程种执行
  • ioThread:执行Sender 任务发送消息,Sender 线程
  • compressionType : 消息的压缩方式,默认为none,可以配置为gzip、snappy、lz4、zstd,对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。但是压缩消息会花费一些时间,故空间换时间的优化方式
  • keySerializer & valueSerializer :key & value的序列化器
  • producerConfig:生产者配置
  • maxBlockTimeMs:通过判断发送消息(send) 或者 分区(partitionFor) 最长等待时间 可以通过 max.block.ms 配置
  • interceptors:拦截器 可以配置多个,形成过滤链
  • apiVersion :发送请求需要带的version,server端可以通过请求来判断是否过期请求等
  • transactionManager : 事务,为了支持Exactly Once 的语义

2.2 Send()方法

Kafka Producer doc 的例子中,创建好KafkaProducer实例后,调用send方法发送消息,代码如下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        //① 拦截器ProducerInterceptor
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }


private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            long nowMs = time.milliseconds();
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                //② 获取broker 集群元数据
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                //③ 序列化器 key
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                //③ 序列化器 value
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            //④ 分区器
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            if (log.isTraceEnabled()) {
                log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional()) {
                transactionManager.failIfNotReadyForSend();
            }
            //⑤ RecordAccumulator 寄存器
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

            if (result.abortForNewBatch) {
                int prevPartition = partition;
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                partition = partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                //⑥ 唤醒sender 线程来拉数据
                this.sender.wakeup();
            }
            return result.future;
         //... 省略一些error process
    }
  • this.interceptors.onSend(record):调用拦截器链的 onSend 方法,直接 filter 方法
  • waitOnMetadata() : 发送请求获取 broker 集群元数据
  • serialize( ):对key,value 序列化
  • partition( ) : 根据具体的策略进行分区
  • accumulator.append( ):往寄存器中添加消息
  • this.sender.wakeup( ) :唤醒Sender 线程,来RecordAccumulator中取消息并发送

接下来一步一步分析他们:

2.3 ProducerInterceptors

ProducerInterceptors 是一个 ProducerInterceptor 的集合形成一个Filter Chain,实现Closeable(try-with-reousrces)优雅关闭。

public class ProducerInterceptors<K, V> implements Closeable {
    //ProducerInterceptor 集合
    private final List<ProducerInterceptor<K, V>> interceptors;

    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
    //①
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        //循环,取出ProducerInterceptors 中的 ProducerInterceptor
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                //对每个ProducerInterceptor 调用其 onSend 方法。
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }

    //② 发送消息后 ack 的回调方法。
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
    
    //③ 发送失败的回调方法
    public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = new TopicPartition(record.topic(),
                                record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                    }
                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }

  • ① 调用ProducerInterceptors中每个Interceptor实例的onSend方法,做一些预处理,比如给消息的key中添加一些标识等。
  • ② 发送消息后 ack 的回调方法。
  • ③ 发送失败的回调方法

比如我自定义两个拦截器形成一个过滤链,代码如下:

/**
 * @author siran
 * @date 2019/9/25 1:44 PM
 */
public class KafkaIntercept implements ProducerInterceptor {

    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(producerRecord.topic(), producerRecord.key(), "hello "+producerRecord.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        system.out.println("intercept1 ack");
    }
}
/**
 * @author siran
 * @date 2019/9/25 1:51 PM
 */
public class KafkaInterceptTwo implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        return new ProducerRecord(producerRecord.topic(), producerRecord.key(), "1232  "+producerRecord.value());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        system.out.println("intercept2 ack");
    }
}


public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("interceptor.classes","KafkaIntercept,KafkaInterceptTwo");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for(int i = 0; i < 10; i++){
            String key = String.valueOf(i);
            String value = "kafka message " + key;
            kafkaProducer.send(new ProducerRecord(topic, key, value)).get();
            System.out.println("message key" + key);
        }
    }

2.4 waitOnMetadata( ) 方法

根据 Topic、Partition 获取集群的元数据

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        //① 先从本地cache 中获取是否有集群元数据,
        Cluster cluster = metadata.fetch();
        //② 检查topcis 集合中是否包含指定的Topic
        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        //把当前topic 添加进缓存中
        metadata.add(topic, nowMs);
        //③ 获取topic 的partition 数量,给随后的计算当前消息发送到哪个partition用
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
    
        //校验
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        //最大的等待时间,就是 maxBlockTimeMs 参数, 默认60秒
        long remainingWaitMs = maxWaitMs;
        long elapsed = 0;

        //④ 循环
        //条件1:通过topic从集群元数据中获取的该topic 的 partition 数量
        //条件2:用户自己指定了要发往topic 的指定partition,但是超过了集群中该topic的partition数量,出现这种情况大约有两种1.创建topic的指定的partition 假如是3,这次用户设置了 4 ; 2. 原本是4,但是通过kafka tools 修改了partition 数量变的 < 4 则会进入该循环中。
        do {
            //可以看到这边的log显示,当进入该循环的时候,说明当前缓存的集群信息不对了,需要发送请求更新
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic, nowMs + elapsed);
            //⑤ 发送请求更新元数据,返回版本号
            int version = metadata.requestUpdateForTopic(topic);
            sender.wakeup();
            try {
                //⑥ 堵塞的更新元数据,这个版本号,是为了避免分布式数据不一致的问题。
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            //⑦ 取更新完的集群信息
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - nowMs;
            //检测是否超时
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            //⑧ 清除上一次的循环的错误,检测这次循环是否会抛出错误有的话抛出
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            //⑨ 获取集群中新的分区数,继续进行判断是否继续循环
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        //退出
        return new ClusterAndWaitTime(cluster, elapsed);
    }
  • ① 先从本地 cache 中获取是否有集群元数据
  • ② 检查 topcis 集合中是否包含指定的Topic
  • ③ 获取 topic 的partition 数量,给随后的计算当前消息发送到哪个partition用
  • ④ 循环
    • 条件1:通过topic从集群元数据中获取的该topic 的 partition 数量
    • 条件2:用户自己指定了要发往topic 的指定partition,但是超过了集群中该topic的partition数量,出现这种情况大约有两种1.创建topic的指定的partition 假如是3,这次用户设置了 4 ; 2. 原本是4,但是通过kafka tools 修改了partition 数量变的 < 4 则会进入该循环中。
  • 发送请求更新元数据,返回版本号
  • ⑥ 堵塞的更新元数据,这个版本号,是为了避免分布式数据不一致的问题。
  • ⑦ 取更新完的集群信息
  • ⑧ 清除上一次的循环的错误,检测这次循环是否会抛出错误有的话抛出
  • ⑨ 获取集群中新的分区数,继续进行判断是否继续循环

2.5 Serializer

对key,value 进行序列化。

public interface Serializer<T> extends Closeable {

    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    byte[] serialize(String topic, T data);


    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    @Override
    default void close() {
        // intentionally left blank
    }
}

比较简单,内置多种序列化方式 StringByteLong 等等,自己看看就好,实现此接口也可以自定义序列化方式。

2.6 Partitioner

这一步操作,是选择合适的分区,进行发送,如果用户指定的partition 那么优先使用,反之使用默认实现 DefaultPartitioner#partition( )

//org.apache.kafka.clients.producer.KafkaProducer
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        //① 获取消息中的partition
        Integer partition = record.partition();
        //② 用户指定直接返回,反之通过 Partitioner 实现类进行分区
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

//org.apache.kafka.clients.producer.Partitioner
public interface Partitioner extends Configurable, Closeable {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    public void close();

    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}
  • ① 获取消息中的 partition
  • ② 用户指定直接返回,反之通过 Partitioner 的实现类进行分区

2.6.1 DefaultPartitioner,默认实现

public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
        //① 如果用户没有对消息指定key 走这段逻辑
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        //② 指定key 走这段逻辑。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //topic的partition 数量
        int numPartitions = partitions.size();
        //通过 murmur2 对key 进行hash 然后与partition num 取模
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
  • ① 用户没有指定key,调用stickyPartitionCache#partition方法计算出发往的 partition 在下面的2.6.3中讲解

  • ② 指定key

    • 直接获取topic 的 partition 数量
    • 通过 murmur2 对key 进行 hash 然后与partition num 取模,非常简单

2.6.2 RoundRobinPartitioner 轮训模式,我记得在2.1.1版本当中,kafka 默认的是使用轮训的。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //① 获取Topic 的 partition 数量
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //② 计算下一个值
        int nextValue = nextValue(topic);
        //③ 获取topic 中可用的partition 数量
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        //如果有可用的partition ,在直接与可用partition 数量取模
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            //没有可用的,与全部分区数取模
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }
  • ① 从集群元数据中获取 topic 的 partition 数量
  • ② 调用 nextValue( ) 方法获取下一个要发往的 partition,方法比较简单就是不断自增,使用atomic 保证原子性
  • ③ 从集群元数据中获取 topic 中可用的partition 数量
    • 如果有可用的partition ,在直接与可用partition 数量取模
    • 如果没有可用的,与全部分区数取模

2.6.3 UniformStickyPartitioner 黏性分区策略,与Consumer 分区策略一个意思。

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //直接就调用 StickyPartitionCache#partition 方法
        return stickyPartitionCache.partition(topic, cluster);
    }
  • StickyPartitionCache
    public class StickyPartitionCache {
          //① 用于缓存,key 是topic ,value 是 发往的分区号
          private final ConcurrentMap<String, Integer> indexCache;
          public StickyPartitionCache() {
              this.indexCache = new ConcurrentHashMap<>();
          }
          //② partition 方法,是如果缓存中能获取到topic的值,直接获取,反之调用nextPartition
          public int partition(String topic, Cluster cluster) {
              Integer part = indexCache.get(topic);
              if (part == null) {
                  return nextPartition(topic, cluster, -1);
              }
              return part;
          }
        
          public int nextPartition(String topic, Cluster cluster, int prevPartition) {
              List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
              Integer oldPart = indexCache.get(topic);
              Integer newPart = oldPart;
    
              if (oldPart == null || oldPart == prevPartition) {
                  List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                  if (availablePartitions.size() < 1) {
                      Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                      newPart = random % partitions.size();
                  } else if (availablePartitions.size() == 1) {
                      newPart = availablePartitions.get(0).partition();
                  } else {
                      while (newPart == null || newPart.equals(oldPart)) {
                          Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                          newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                      }
                  }
                  if (oldPart == null) {
                      indexCache.putIfAbsent(topic, newPart);
                  } else {
                      indexCache.replace(topic, prevPartition, newPart);
                  }
                  return indexCache.get(topic);
              }
              return indexCache.get(topic);
          }
        
      }
    

这个逻辑也是比较简单,大致就是:

  • 通过 indexCache 进行缓存 topic 与 partition 的关系
  • 如果能在 indexCache 中找到映射关系,直接获取,不进行计算。
  • nextPartition 大致就是通过伪随机得到的值与partition的数量取模
  • 这个黏性的意思就是如果相同的topic 发到相同的 partition中

2.6.4 自定义分区

/**
 * @author siran
 * @date 2019-08-19 14:35
 */
public class SiranKafkaRoutePartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        Integer targetPartition = null;
        if(availablePartitions.size() > 0){
            if(Integer.parseInt(key.toString()) == 0){
                targetPartition = 0;
            }else {
                targetPartition = 1;
            }
        }
        return targetPartition;
    }
}

实现就比较简单,获取key = 0就发往partition0, key = 1就发往partition1,和自定义过滤器一样构建KafkaProducer的时候传入自定义的partition就行

properties.setProperty("partitioner.class","SiranKafkaRoutePartition");

由于篇幅问题 RecordAccumulator 以及 Sender 另开章节分析。


参考: Kafka-2.5.0 源码

Kafka 官网

《Apache Kafka源码剖析》

《深入理解Kafka:核心设计与实践》