KafkaTools

2020年4月22日 | 作者 Siran | 800字 | 阅读大约需要2分钟
归档于 消息队列 | 标签 #kafka
  • 创建 topic –replication-factor 副本因子数量 –partitions 分区数量

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

  • 查看 topic 的信息 topics-with-overrides 查看与集群配置不一样的 topic under-replicated-partitions 查看所有包含失效副本的分区 topics-with-overrides 没有 leader 副本的分区

    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test
    

  • 开启消费者订阅某 topic 进行消费消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
    

  • 开启生产者向某 topic 发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

  • 查看所有可用的主题

    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
    

  • 创建 topic 指定分区副本 4个分区 2个副本 注意:同一个分区内的副本不能重复

    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test --replica-assignment 2:0,0:1,1:2,2:1
    

  • 通过–config 参数来设置创建主题的相关参数

    bin/kakfa-topics.sh --zookeeper localhost:2181/kafka --create --topic test --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config max.message.bytes=10000
    

  • 创建主题加上 if-not-exists 参数避免创建相同 topic 报错, 重复创建不进行处理

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test --if-not-exists
    

  • 看lag,就是未消费的

    kafka-consumer-groups.sh --bootstrap-server 192.168.3.30:9092,192.168.3.31:9092,192.168.3.32:9092 --describe --group dapTest
    

  • 可以跑脚本消费掉消息

    kafka-console-consumer.sh -bootstrap-server 192.168.3.30:9092,192.168.3.31:9092,192.168.3.32:9092 --group TESTZZZ --topic FlinkDemoResult2
    

增,删,改使用 topics.sh 已经过时 建议使用 kakfa-configs.sh 脚本

  • 修改partition数(只能增加)

    kafka-topics.sh --alter --zookeeper 192.168.3.30:2181,192.168.3.31:2181,192.168.3.32:2181/kafka --partitions 50 --topic TOPIC_DCP_CARDDATA_BKUP_Q
    

  • 修改配置

    bin/kafka-topics.sh --alter --zookeeper localhost:2181/kafka --topic test --config max.message.bytes=20000
    

  • 删除配置

    bin/kafka-topics.sh --alter --zookeeper localhost:2181/kafka --topic test --delete-config max.message.bytes
    

  • 使用 config.sh 查看某主题的配置

    bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name test
    

  • 使用 config.sh 修改某主题的配置

    bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --add-config cleanup.policy=compact,max.message.bytes=1000
    

  • 使用 config.sh 删除某主题的配置

    bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --delete-config cleanup.policy
    

  • kafka-perferred-replica-election.sh 脚本提供了对分区进行重新平衡的功能

    bin/kafka-perferred-replica-election.sh --zookeeper localhost:2181/kafka
    

  • 通过 path-to-json-file 小批量的对部分分区进行优先副本的选举操作

    bin/kafka-perferred-replica-election.sh --zookeeper localhost:2181/kafka --path-to-json-file election.json
    election.json
    {
      "partitions":[
      {"partition":"0",
       "topic":"test-1"
      },
      {"partition":"1",
       "topic":"test-1"
       },
      {"partition":"2",
       "topic":"test-1"
       }
      ]
    }
    

  • 通过 kafak-reassign-partitions.sh 脚本执行分区重分配的工作,可以对集群扩容,brokers,节点失效的场景下对分区进行迁移

    要下线 brokerId 为1 的 broker 节点

  • (1)创建json文件 reassgin.json

    {
      "topics":[
      {"topic":"topic-reassign"}
      ],
      "version":1
    }
    

  • (2)通过 json 文件和指定要分配的 broker 节点列表来生成一份候选的重分配方案

    bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka --generate --topic-to-move-json-file reassign.json --broker-list 0,2
    

  • (3)把通过第二步骤生成的 json 内容保存在一个 json 文件 假定为 project.json

    bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka -execute --reassginment-json-file project.json
    

  • 查看分区重分配的进度

    bin/kafka-reassign-partition.sh --zookeeper localhost:2181/kafka -verify --reassginment-json-file project.json
    

  • 复制限流 follower.replication.throttled.rate 用于设置 follower 副本复制的速度, leader.replication.throttled.rate 用于设置 leader 副本传输的速度 单位都是 b/s

    //通过 kafka-config.sh
    bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
    

通过 kafka-reassign-partitions.sh 性能测试

  • producer 测试 records sent 表示发送的消息总数 records/sec 表示以每秒发送的消息数来统计吞吐 MB/sec 表示以每秒发送的消息大小来统计吞吐量

    bin/kafka-producer.perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1 --print-metrics
    

  • consumer 测试

    bin/kafka-consumer-perf-test.sh --topic test --message 1000000 --broker-list localhost:9092
    

  • 查看日志

    bin/kafka-dump-log.sh --files