Kafka 的 常用命令

摘要

  • 本文介绍 Kafka 的 常用命令

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 创建 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 3 --replication-factor 2
# --bootstrap-server 指定 kafka 集群地址
# --topic 创建的 topic 名称
# --partitions 指定分区数,不设置则默认使用 server.properties 中设置的默认值
# --replication-factor 指定副本数,不设置则默认使用 server.properties 中设置的默认值


# 列出 topic
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 topic 详情
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
## 输出
Topic: test TopicId: Ru0tWQJ4RMWcjjGsKAdWQg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
Topic: test Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
Topic: test Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 3,2,1 Elr: N/A LastKnownElr: N/A
## 输出说明
# 总体信息(Topic 概览)Topic: test TopicId: Ru0tWQJ4RMWcjjGsKAdWQg PartitionCount: 3 ReplicationFactor: 3 Configs:
| 字段 | 含义 |
| ----------------------------------- | ------------------------------------------------------------- |
| **Topic: disTopic** | Topic 名称,即当前描述的主题。 |
| **TopicId: VUK7Mc9oQdS1mjGG7OhQzQ** | Kafka 内部自动生成的唯一标识符(UUID),Kafka 3.x 之后引入,用于区分同名但不同生命周期的 topic。 |
| **PartitionCount: 3** | 该主题有 3 个分区(partition)。每个分区存储一部分消息。 |
| **ReplicationFactor:** | 副本因子。这里虽然输出中没显示具体值,但可从每行分区配置推断是 **3**(每个分区有 3 个副本)。 |
| **Configs:** | topic 的配置项(例如清理策略、压缩类型等),如果为空,说明使用默认配置。 |

# 分区详情(每个 Partition 一行)

| 字段 | 含义 |
| --------------------------------- | ----------------------------------------------------------- |
| **Partition: 0** | 第 0 号分区。 |
| **Leader: 2** | 该分区当前的 **Leader Broker 是 broker ID = 2**,只有 Leader 才处理读写请求。 |
| **Replicas: 2,3,1** | 该分区的所有副本存放在哪些 Broker 上(即副本分布),分别是 broker 2、3、1。 |
| **Isr (In-Sync Replicas): 2,3,1** | 当前与 Leader 保持同步的副本集合。这里所有副本都在同步中(健康状态 👍)。 |
| **Elr / LastKnownElr** | Kafka 新版本中引入的 "Enhanced Leader Replica" 状态,目前未启用(N/A)。 |


# 删除 topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

consumer

1
2
3
4
5
6
7
# 创建 consumer
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test
# --topic 指定 topic
# --group 指定 consumer 组

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# --from-beginning 从 topic 的最开始消费

consumer-group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 列出 consumer 组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# --bootstrap-server 集群地址

# 查看 consumer 组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
## 输出
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test 0 2 2 0 console-consumer-2102b86e-895c-4ee3-8304-6df83523d1c1 /10.250.0.7 console-consumer
test test 1 2 2 0 console-consumer-2102b86e-895c-4ee3-8304-6df83523d1c1 /10.250.0.7 console-consumer
test test 2 1 1 0 console-consumer-9ac45b29-d8f3-4649-ab09-7b567aa2ba53 /10.250.0.108 console-consumer

## 输出说明
# GROUP 消费组名称
# TOPIC topic 名称
# PARTITION 分区编号
# CURRENT-OFFSET 当前消费的 offset
# LOG-END-OFFSET topic 中最大的 offset
# LAG 当前消费的 offset 与 topic 中最大的 offset 的差值,即剩余未消费的 消息数量
# CONSUMER-ID 当前消费的 consumer 的 id
# HOST 当前消费的 consumer 的主机名
# CLIENT-ID 当前消费的 consumer 的客户端名称

# 删除 consumer 组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group test

producer

1
2
3
# 创建 producer
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# --topic 指定 topic

用于手动触发 Kafka 分区 Leader 选举(自平衡)

  • kafka的自平衡默认开启,每隔 300秒扫描一次,如果需要平衡的比例高于 10%,则会触发一次

1
2
3
4
5
6
# 开启自动平衡
auto.leader.rebalance.enable=true
# 间隔扫描时间 默认 300 秒
eader.imbalance.check.interval.seconds=300
# 触发比例,即需要平衡的 partition 占全部 partition 的比例,默认 10%
leader.imbalance.per.broker.percentage=10
  • 建议关闭,改为业务低峰时手动触发

1
2
# 列出消费者自动平衡
kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --topic test --partition 0
  • 🧩 参数说明:–election-type

参数值 含义 触发条件 典型使用场景
preferred 首选 Leader 选举(Preferred Leader Election)
Kafka 会尝试将分区的 leader 重新切换为「首选副本」(通常是第一个副本)。
只有当前 leader 不是 首选副本时才执行。 某些副本被自动选举成 leader 后,希望恢复原有「首选 leader」结构,以实现负载均衡。
unclean 非干净 Leader 选举(Unclean Leader Election)
允许从不同步的副本中选举新的 leader。
仅在分区 没有可用 leader 时执行。 在紧急恢复场景下(比如所有 ISR 副本都下线),为了恢复服务可用性,即使会导致数据丢失。