-
创建 topic –replication-factor 副本因子数量 –partitions 分区数量
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-
查看 topic 的信息 topics-with-overrides 查看与集群配置不一样的 topic under-replicated-partitions 查看所有包含失效副本的分区 topics-with-overrides 没有 leader 副本的分区
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test
-
开启消费者订阅某 topic 进行消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
-
开启生产者向某 topic 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
-
查看所有可用的主题
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
-
创建 topic 指定分区副本 4个分区 2个副本 注意:同一个分区内的副本不能重复
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test --replica-assignment 2:0,0:1,1:2,2:1
-
通过–config 参数来设置创建主题的相关参数
bin/kakfa-topics.sh --zookeeper localhost:2181/kafka --create --topic test --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config max.message.bytes=10000
-
创建主题加上 if-not-exists 参数避免创建相同 topic 报错, 重复创建不进行处理
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test --if-not-exists
-
看lag,就是未消费的
kafka-consumer-groups.sh --bootstrap-server 192.168.3.30:9092,192.168.3.31:9092,192.168.3.32:9092 --describe --group dapTest
-
可以跑脚本消费掉消息
kafka-console-consumer.sh -bootstrap-server 192.168.3.30:9092,192.168.3.31:9092,192.168.3.32:9092 --group TESTZZZ --topic FlinkDemoResult2
增,删,改使用 topics.sh 已经过时 建议使用 kakfa-configs.sh 脚本
-
修改partition数(只能增加)
kafka-topics.sh --alter --zookeeper 192.168.3.30:2181,192.168.3.31:2181,192.168.3.32:2181/kafka --partitions 50 --topic TOPIC_DCP_CARDDATA_BKUP_Q
-
修改配置
bin/kafka-topics.sh --alter --zookeeper localhost:2181/kafka --topic test --config max.message.bytes=20000
-
删除配置
bin/kafka-topics.sh --alter --zookeeper localhost:2181/kafka --topic test --delete-config max.message.bytes
-
使用 config.sh 查看某主题的配置
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name test
-
使用 config.sh 修改某主题的配置
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --add-config cleanup.policy=compact,max.message.bytes=1000
-
使用 config.sh 删除某主题的配置
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --delete-config cleanup.policy
-
kafka-perferred-replica-election.sh 脚本提供了对分区进行重新平衡的功能
bin/kafka-perferred-replica-election.sh --zookeeper localhost:2181/kafka
-
通过 path-to-json-file 小批量的对部分分区进行优先副本的选举操作
bin/kafka-perferred-replica-election.sh --zookeeper localhost:2181/kafka --path-to-json-file election.json election.json { "partitions":[ {"partition":"0", "topic":"test-1" }, {"partition":"1", "topic":"test-1" }, {"partition":"2", "topic":"test-1" } ] }
-
通过 kafak-reassign-partitions.sh 脚本执行分区重分配的工作,可以对集群扩容,brokers,节点失效的场景下对分区进行迁移
要下线 brokerId 为1 的 broker 节点
-
(1)创建json文件 reassgin.json
{ "topics":[ {"topic":"topic-reassign"} ], "version":1 }
-
(2)通过 json 文件和指定要分配的 broker 节点列表来生成一份候选的重分配方案
bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka --generate --topic-to-move-json-file reassign.json --broker-list 0,2
-
(3)把通过第二步骤生成的 json 内容保存在一个 json 文件 假定为 project.json
bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka -execute --reassginment-json-file project.json
-
查看分区重分配的进度
bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka -verify --reassginment-json-file project.json
-
复制限流 follower.replication.throttled.rate 用于设置 follower 副本复制的速度, leader.replication.throttled.rate 用于设置 leader 副本传输的速度 单位都是 b/s
//通过 kafka-config.sh bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
通过 kafka-reassign-partitions.sh 性能测试
-
producer 测试 records sent 表示发送的消息总数 records/sec 表示以每秒发送的消息数来统计吞吐 MB/sec 表示以每秒发送的消息大小来统计吞吐量
bin/kafka-producer.perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1 --print-metrics
-
consumer 测试
bin/kafka-consumer-perf-test.sh --topic test --message 1000000 --broker-list localhost:9092
-
查看日志
bin/kafka-dump-log.sh --files