一、概述

本文以 kafka_2.5.1 版本为例,描述的常用命令,在 3.x 版本上命令有所不同。

二、主要命令

kafka 启动命令:

nohup bin/kafka-server-start.sh config/server.properties &

kafka 停止命令。

bin/kafka-server-stop.sh

2.1 Topic 相关

  1. 创建名为 testTopic

    bin/kafka-topics.sh -zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test
    

    --partitions:分区数

    --replication-factor:副本数

  2. 查询 Topic 列表

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
  3. 删除名为 testTopic

    bin/kafka-topics.sh --delete --zookeeper localhost:2181  --topic test
    

    配置 delete.topic.enabletrue,这样才能删除 topic

  4. 查询 Topic 的信息

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    

    如果未指定 topic 则输出所有 topic 的信息

2.2 消息相关

  1. 生产者发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
  2. 消费者查询消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1
    

    --from-beginning:表示从头开始接收数据

    --group:指定消费者组

  3. 查询名为 testTopic 的消息。

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
    
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
    

    --time-1 表示要获取指定 topic 所有分区当前的最大位移(历史总消息数),--time-2 表示获取当前最早位移(被消费的消息数),两个命令的输出结果相减便可得到所有分区当前的消息总数。

    输出示例:test:0:3

    第一个数字0表示分区,第二个数字3表示偏移量。