1. 概述
以Kafka Producer doc 中的例子作为开场( 基于Kafka-2.5.0 )
,Producer 用于发消息到 Kafka Broker ,代码如下:
public void main(String[] args){
//① 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//② 创建KafkaProducer 实例
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
//③ 调用 send 方法发送消息到 Broker
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
①
Kafka 的配置bootstrap.servers
:kafka server的 Ip + Portacks
:发送消息到Kafka server 后,server 根据哪种策略返回确认收到消息。有以下三种类型:- -1:意味着需要等待ISR集合中所有的副本都确认收到消息之后才能正确收到响应结果,或者捕获超时异常(all 就是 -1 的含义)
- 0:不需要server端的确认,直接返回
- 1:只要 leader 接受到了就返回
key.serializer
:key的序列化方式value.serializer
:value的序列化方式
②
创建 KafkaProducer 实例③
调用 send 方法发送消息到 Broker
1.1 Kafka Producer 整体流程图:
整个生产者客户端由两个线程
协调运行,这两个线程分别为主线程
和Sender线程(发送线程)
。
- 主线程:
- ①
ProducerInterceptors
: 对消息进行拦截,可以自定义只要实现 ProducerInterceptor 接口就行,可以有多个实现拦截链。 - ②
Serializer
: 对消息的 key 和 value 进行序列化,可以自定义实现 Serializer 接口即可 - ③
Partitioner
为消息进行分区,可以自定义实现 Partitioner 接口即可 - ④
RecordAccumlator
收集消息,实现批量发送,进而减少网络传输的资源消耗以提升性能。
- ①
- Sender 线程:
- ⑤
Sender
线程从RecordAccumlator 中获取消息 - ⑥ 构造
ClietnRequest
- ⑦ 将ClientRequest交给
NetWorkClient
准备发送 - ⑧ 将请求放入
KafkaChannel
中 - ⑨ 执行
IO
操作,发送请求 - ⑩ 得到
响应
,调用 ClientRequest 的回调函数 - ⑪ 调用 RecordBatch 的回调函数,进行
清理
操作
- ⑤
下面根据图中的流程看看源码实现的细节:
2. 源码解析
2.1 KafkaProducer
public class KafkaProducer<K, V> implements Producer<K, V> {
//① 生产者 id
private final String clientId;
//② Visible for testing
final Metrics metrics;
//③ 分区,根据具体的分区策略,分配消息
private final Partitioner partitioner;
//④ 生产者发送消息最大值,默认 1MB ,可以通过 max.request.size 进行配置
private final int maxRequestSize;
//⑤ 生产者用于缓存消息的缓冲区大小,默认32MB, 可以通过buffer.memory 进行配置
private final long totalMemorySize;
//⑥ 每个Producer 实例的元数据
private final ProducerMetadata metadata;
//⑦ 消息寄存器,用于收集并缓存消息
private final RecordAccumulator accumulator;
//⑧ sender ,用于发送消息,实现runnable 接口,在ioThread 线程种执行
private final Sender sender;
//⑨ 执行Sender 任务发送消息,Sender 线程
private final Thread ioThread;
//⑩ 消息的压缩方式,默认为none,可以配置为gzip、snappy、lz4,zstd,可以通过compression.type进行配置
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
//⑪ key 的序列化器
private final Serializer<K> keySerializer;
//value 的序列化器
private final Serializer<V> valueSerializer;
//⑫ 生产者配置
private final ProducerConfig producerConfig;
//⑬ 通过判断发送消息(send) 或者 分区 最长等待时间 可以通过 max.block.ms 配置
private final long maxBlockTimeMs;
//⑭ 拦截器
private final ProducerInterceptors<K, V> interceptors;
//⑮ 发送请求需要带的version,server端可以通过请求来判断是否过期请求等
private final ApiVersions apiVersions;
//⑯ 事务
private final TransactionManager transactionManager;
}
- ①
clientId
: 生产者Id - ②
metrics
:指标 - ③
partitioner
:分区,根据具体的分区策略,分配消息 - ④
maxRequestSize
:生产者发送消息最大值,默认 1MB ,可以通过max.request.size
进行配置 - ⑤
totalMemorySize
:生产者用于缓存消息的缓冲区大小,默认32MB, 可以通过buffer.memory
进行配置 - ⑥
metadata
:producer 的元数据 - ⑦
accumulator
:寄存器,用于收集并缓存消息,默认16kb,大小可以通过batch.size
进行配置 - ⑧
sender
:用于发送消息,实现runnable 接口,在ioThread 线程种执行 - ⑨
ioThread
:执行Sender 任务发送消息,Sender 线程 - ⑩
compressionType
: 消息的压缩方式,默认为none,可以配置为gzip、snappy、lz4、zstd
,对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。但是压缩消息会花费一些时间,故空间换时间的优化方式 - ⑪
keySerializer & valueSerializer
:key & value的序列化器 - ⑫
producerConfig
:生产者配置 - ⑬
maxBlockTimeMs
:通过判断发送消息(send)
或者分区(partitionFor)
最长等待时间 可以通过 max.block.ms 配置 - ⑭
interceptors
:拦截器 可以配置多个,形成过滤链 - ⑮
apiVersion
:发送请求需要带的version,server端可以通过请求来判断是否过期请求等 - ⑯
transactionManager
: 事务,为了支持Exactly Once
的语义
2.2 Send()方法
在Kafka Producer doc
的例子中,创建好KafkaProducer实例后,调用send方法发送消息,代码如下:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//① 拦截器ProducerInterceptor
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
//② 获取broker 集群元数据
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
//③ 序列化器 key
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
//③ 序列化器 value
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
//④ 分区器
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
//⑤ RecordAccumulator 寄存器
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
//⑥ 唤醒sender 线程来拉数据
this.sender.wakeup();
}
return result.future;
//... 省略一些error process
}
- ①
this.interceptors.onSend(record)
:调用拦截器链的 onSend 方法,直接 filter 方法 - ②
waitOnMetadata()
: 发送请求获取 broker 集群元数据 - ③
serialize( )
:对key,value 序列化 - ④
partition( )
: 根据具体的策略进行分区 - ⑤
accumulator.append( )
:往寄存器中添加消息 - ⑥
this.sender.wakeup( )
:唤醒Sender 线程,来RecordAccumulator
中取消息并发送
接下来一步一步分析他们:
2.3 ProducerInterceptors
ProducerInterceptors 是一个 ProducerInterceptor 的集合形成一个Filter Chain,实现Closeable(try-with-reousrces)
优雅关闭。
public class ProducerInterceptors<K, V> implements Closeable {
//ProducerInterceptor 集合
private final List<ProducerInterceptor<K, V>> interceptors;
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
this.interceptors = interceptors;
}
//①
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
//循环,取出ProducerInterceptors 中的 ProducerInterceptor
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
//对每个ProducerInterceptor 调用其 onSend 方法。
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
//② 发送消息后 ack 的回调方法。
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptor.onAcknowledgement(metadata, exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
}
}
}
//③ 发送失败的回调方法
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
if (record == null && interceptTopicPartition == null) {
interceptor.onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = new TopicPartition(record.topic(),
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
}
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
}
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement callback", e);
}
}
}
- ① 调用ProducerInterceptors中每个Interceptor实例的
onSend
方法,做一些预处理,比如给消息的key中添加一些标识等。 - ② 发送消息后
ack
的回调方法。 - ③ 发送失败的
回调
方法
比如我自定义两个拦截器形成一个过滤链
,代码如下:
/**
* @author siran
* @date 2019/9/25 1:44 PM
*/
public class KafkaIntercept implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return new ProducerRecord(producerRecord.topic(), producerRecord.key(), "hello "+producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
system.out.println("intercept1 ack");
}
}
/**
* @author siran
* @date 2019/9/25 1:51 PM
*/
public class KafkaInterceptTwo implements ProducerInterceptor {
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return new ProducerRecord(producerRecord.topic(), producerRecord.key(), "1232 "+producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
system.out.println("intercept2 ack");
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("interceptor.classes","KafkaIntercept,KafkaInterceptTwo");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
for(int i = 0; i < 10; i++){
String key = String.valueOf(i);
String value = "kafka message " + key;
kafkaProducer.send(new ProducerRecord(topic, key, value)).get();
System.out.println("message key" + key);
}
}
2.4 waitOnMetadata( ) 方法
根据 Topic、Partition
获取集群的元数据
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
//① 先从本地cache 中获取是否有集群元数据,
Cluster cluster = metadata.fetch();
//② 检查topcis 集合中是否包含指定的Topic
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
//把当前topic 添加进缓存中
metadata.add(topic, nowMs);
//③ 获取topic 的partition 数量,给随后的计算当前消息发送到哪个partition用
Integer partitionsCount = cluster.partitionCountForTopic(topic);
//校验
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
//最大的等待时间,就是 maxBlockTimeMs 参数, 默认60秒
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
//④ 循环
//条件1:通过topic从集群元数据中获取的该topic 的 partition 数量
//条件2:用户自己指定了要发往topic 的指定partition,但是超过了集群中该topic的partition数量,出现这种情况大约有两种1.创建topic的指定的partition 假如是3,这次用户设置了 4 ; 2. 原本是4,但是通过kafka tools 修改了partition 数量变的 < 4 则会进入该循环中。
do {
//可以看到这边的log显示,当进入该循环的时候,说明当前缓存的集群信息不对了,需要发送请求更新
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic, nowMs + elapsed);
//⑤ 发送请求更新元数据,返回版本号
int version = metadata.requestUpdateForTopic(topic);
sender.wakeup();
try {
//⑥ 堵塞的更新元数据,这个版本号,是为了避免分布式数据不一致的问题。
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
//⑦ 取更新完的集群信息
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
//检测是否超时
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
//⑧ 清除上一次的循环的错误,检测这次循环是否会抛出错误有的话抛出
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
//⑨ 获取集群中新的分区数,继续进行判断是否继续循环
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
//退出
return new ClusterAndWaitTime(cluster, elapsed);
}
- ① 先从本地
cache
中获取是否有集群元数据 - ② 检查
topcis
集合中是否包含指定的Topic - ③ 获取 topic 的partition 数量,给随后的计算当前消息发送到哪个partition用
- ④ 循环
条件1
:通过topic从集群元数据中获取的该topic 的 partition 数量条件2
:用户自己指定了要发往topic 的指定partition,但是超过了集群中该topic的partition数量,出现这种情况大约有两种1.创建topic的指定的partition 假如是3,这次用户设置了 4 ; 2. 原本是4,但是通过kafka tools 修改了partition 数量变的 < 4 则会进入该循环中。
- ⑤
发送请求
更新元数据,返回版本号 - ⑥ 堵塞的更新
元数据
,这个版本号,是为了避免分布式数据不一致的问题。 - ⑦ 取更新完的集群信息
- ⑧ 清除上一次的循环的错误,检测这次循环是否会抛出错误有的话抛出
- ⑨ 获取集群中新的分区数,继续进行判断是否继续循环
2.5 Serializer
对key,value 进行序列化。
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
@Override
default void close() {
// intentionally left blank
}
}
比较简单,内置多种序列化方式 String
、Byte
、Long
等等,自己看看就好,实现此接口也可以自定义序列化方式。
2.6 Partitioner
这一步操作,是选择合适的分区,进行发送,如果用户指定的partition 那么优先使用,反之使用默认实现 DefaultPartitioner#partition( )
//org.apache.kafka.clients.producer.KafkaProducer
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//① 获取消息中的partition
Integer partition = record.partition();
//② 用户指定直接返回,反之通过 Partitioner 实现类进行分区
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
//org.apache.kafka.clients.producer.Partitioner
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
- ① 获取消息中的
partition
- ② 用户指定直接返回,反之通过
Partitioner
的实现类进行分区
2.6.1 DefaultPartitioner,默认实现
public int partition(String topic, Object key, byte[] keyBytes, Object
value, byte[] valueBytes, Cluster cluster) {
//① 如果用户没有对消息指定key 走这段逻辑
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//② 指定key 走这段逻辑。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//topic的partition 数量
int numPartitions = partitions.size();
//通过 murmur2 对key 进行hash 然后与partition num 取模
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
-
① 用户没有指定key,调用
stickyPartitionCache#partition
方法计算出发往的 partition 在下面的2.6.3
中讲解 -
② 指定key
- 直接获取topic 的 partition 数量
- 通过
murmur2
对key 进行 hash 然后与partition num 取模,非常简单
2.6.2 RoundRobinPartitioner
轮训模式,我记得在2.1.1
版本当中,kafka 默认的是使用轮训的。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//① 获取Topic 的 partition 数量
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//② 计算下一个值
int nextValue = nextValue(topic);
//③ 获取topic 中可用的partition 数量
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
//如果有可用的partition ,在直接与可用partition 数量取模
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
//没有可用的,与全部分区数取模
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
- ① 从集群
元数据
中获取 topic 的 partition 数量 - ② 调用
nextValue( )
方法获取下一个要发往的 partition,方法比较简单就是不断自增,使用atomic 保证原子性 - ③ 从集群元数据中获取 topic 中
可用
的partition 数量- 如果有可用的partition ,在直接与
可用partition
数量取模 - 如果没有可用的,与
全部分区数
取模
- 如果有可用的partition ,在直接与
2.6.3 UniformStickyPartitioner 黏性分区策略,与Consumer 分区策略一个意思。
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//直接就调用 StickyPartitionCache#partition 方法
return stickyPartitionCache.partition(topic, cluster);
}
- StickyPartitionCache
public class StickyPartitionCache { //① 用于缓存,key 是topic ,value 是 发往的分区号 private final ConcurrentMap<String, Integer> indexCache; public StickyPartitionCache() { this.indexCache = new ConcurrentHashMap<>(); } //② partition 方法,是如果缓存中能获取到topic的值,直接获取,反之调用nextPartition public int partition(String topic, Cluster cluster) { Integer part = indexCache.get(topic); if (part == null) { return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; if (oldPart == null || oldPart == prevPartition) { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } }
这个逻辑也是比较简单,大致就是:
- 通过
indexCache
进行缓存 topic 与 partition 的关系 - 如果能在 indexCache 中找到映射关系,直接获取,不进行计算。
- nextPartition 大致就是通过
伪随机
得到的值与partition的数量取模 - 这个黏性的意思就是如果相同的topic 发到
相同
的 partition中
2.6.4 自定义分区
/**
* @author siran
* @date 2019-08-19 14:35
*/
public class SiranKafkaRoutePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
Integer targetPartition = null;
if(availablePartitions.size() > 0){
if(Integer.parseInt(key.toString()) == 0){
targetPartition = 0;
}else {
targetPartition = 1;
}
}
return targetPartition;
}
}
实现就比较简单,获取key = 0
就发往partition0, key = 1
就发往partition1,和自定义过滤器一样构建KafkaProducer的时候传入自定义的partition就行
properties.setProperty("partitioner.class","SiranKafkaRoutePartition");
由于篇幅问题 RecordAccumulator
以及 Sender
另开章节分析。
参考: Kafka-2.5.0 源码
《Apache Kafka源码剖析》
《深入理解Kafka:核心设计与实践》