Kafka 的安装:基于 Zookeeper

摘要

  • 本文介绍 CentOS9 中 Kafka 的安装与使用,基于 Zookeeper。

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

Kafka 简介

  • Apache Kafka 是一个 分布式的流处理/事件流平台,既可以作为消息系统,也可以作为持久化的 日志/记录存储与流处理平台

  • 它的设计目标是高吞吐、低延迟、可水平扩展、容错,以及可持久化数据。

  • 在 Kafka 中,消息被归类为 主题(Topic),每个主题可以根据配置被拆分为多个 分区(Partition),每个分区内部消息是严格有序的,并以追加方式写入。消费者可以按偏移量(offset)读取消息。

  • Kafka 提供多个 API:Producer、Consumer、Streams(流处理)、Connect(与外部系统整合)等。

  • Kafka 的核心架构要素与工作机制

组件 / 概念 作用 / 描述
Broker(节点 / 服务器) Kafka 集群中的服务器实例,负责接收、存储、分发消息
Topic 消息的“分类”逻辑单元,Producer 写入、Consumer 读取
Partition 一个 Topic 被划分的子单元。分区使得主题可以横向扩展,并支持并行读写
Offset 每条消息在某个分区中的唯一位置标识,消费者根据 offset 来决定下一条读取
Replication(副本) 为了容错性,每个分区可以有多个副本(副本分布在不同 Broker 上)
Leader / Follower 在副本中,一个副本为 Leader,接受读写请求;其他为 Follower,从 Leader 同步数据
Consumer Group 一组消费者共同消费一个 Topic。每个分区在同一个消费者组中通常只被一个消费者 “拥有”
ZooKeeper / KRaft 用于元数据管理、集群协调(在较老版本中是 ZooKeeper;新版本推向 KRaft)

  • 消息写入流程(简化):

    • Producer 将消息发送给某个 Topic 的 Leader 分区节点
    • Leader 接收到消息后,将其追加写入本地日志,并返回确认(ACK)
    • Follower 副本从 Leader 拉取数据进行同步
    • 消费者根据自己的 offset 从对应 Partition 中读取消息
  • 消费控制与容错:

    • 消费者维护自己的 offset(可以自动提交,也可手动控制),这样即使消费者重启,也可以从上次停止的位置继续。
    • 如果某个 Broker 宕机,副本可以切换(Leader 选举),保证服务继续。
    • 分区与副本机制使得 Kafka 能够扩展容量 & 提高可靠性。
  • Kafka 的典型使用场景

场景类别 说明
实时数据管道 / 数据集成 用于将各种数据源(如日志、数据库变更、传感器、用户事件等)实时采集、传输、分发到下游系统(如 OLAP、搜索引擎、监控平台等),构建高效的数据通道。
事件驱动 / 事件溯源 记录系统内部或跨系统的事件(状态变化),实现事件驱动架构(EDA)或事件溯源(Event Sourcing),可用于审计、回放、状态重建等。
日志聚合 / 分析 将分布式系统中的应用日志、监控指标、操作日志等统一收集到 Kafka 中,集中存储与分析,常与 ELK、ClickHouse 等结合。
流处理 与 Kafka Streams、Apache Flink、Spark Streaming 等流处理框架配合,对流经 Kafka 的数据进行实时计算、聚合、过滤、窗口统计等操作。
系统解耦 / 异步通信 作为系统间的消息中间件,实现发布-订阅模式,减少系统间耦合,支持异步通信、流量削峰、缓冲等,提升系统稳定性与扩展性。

Kafka 安装

  • 这里先介绍基于 Zookeeper 的安装方式,下文会介绍基于 KRaft 的安装方式。

  • Kafka 3.9.1 的安装与运行需要 JDK 8+,所有我们需要提前安装 JDK 8+。可以选择OpenJDK,清华大学镜像站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# root 用户
# 创建安装目录
mkdir /usr/local/jdk
cd /usr/local/jdk
# 下载JDK
wget https://mirrors.tuna.tsinghua.edu.cn/Adoptium/8/jdk/x64/linux/OpenJDK8U-jdk_x64_linux_hotspot_8u462b08.tar.gz
tar -zxvf OpenJDK8U-jdk_x64_linux_hotspot_8u462b08.tar.gz
ln -s jdk8u462-b08 jdk8
# 配置环境变量
echo 'export JAVA_HOME=/usr/local/jdk/jdk8' >> /etc/profile
# 注意这里是 单引号,双引号会解析变量,导致配置失败
echo 'export PATH=$JAVA_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 检查JDK安装
java -version

单机安装

  • 部署kafka都会使用集群模式,单机模式只作为学习试用。

  • 下载Kafka,下载页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# root 用户
# 创建安装目录
mkdir /usr/local/kafka
cd /usr/local/kafka
# 下载Kafka
wget https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.
tar -zxvf kafka_2.13-3.9.1.tgz
ln -s kafka_2.13-3.9.1 kafka3
# 配置环境变量
echo 'export KAFKA_HOME=/usr/local/kafka/kafka3' >> /etc/profile
echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile
source /etc/profile
# 查看Kafka版本
kafka-topics.sh --version
  • 启动 Zookeeper,kafka内置了zookeeper,所以不需要单独安装。

1
zookeeper-server-start.sh /usr/local/kafka/kafka3/config/zookeeper.properties
  • 启动 kafka,需打开另一个终端

1
2
3
4
5
# -daemon 后台运行
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/server.properties

# 检查kafka是否启动成功
jps -l | grep kafka

小贴士

  • 注意:默认情况下 启动 kafka 需要的内存大小为 1G,这一点可以在 kafka-server-start.sh 脚本中查看到
1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
  • 所以如果内存不够,可以设置环境变量后再启动kafka
1
2
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/server.properties
  • 测试

1
2
3
4
5
6
7
8
9
# 创建 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test

# 启动消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# 启动生产者
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> hello world # 输入内容,消费者会收到

集群安装

  • 集群安装需要准备多个节点,这里我准备三个节点,分别如下:

1
2
3
10.250.0.7
10.250.0.174
10.250.0.108

搭建 Zookeeper 集群

  • 关于如何搭建 Zookeeper 集群,可以参考我之前的文章 Zookeeper 的安装及使用

  • 如果图省事也可以直接使用 Kafka 自带的 zookeeper,编辑其配置文件 config/zookeeper.properties如下,注意要在 dataDir 目录下创建myid文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
dataDir=/usr/local/kafka/dataDir/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

initLimit=10
syncLimit=5

server.1=10.250.0.7:2888:3888
server.2=10.250.0.174:2888:3888
server.3=10.250.0.108:2888:3888

配置 Kafka 集群

  • 修改主机的主机名

1
2
3
hostnamectl hostname worker1
# hostnamectl hostname worker2
# hostnamectl hostname worker3
  • 为了后续方便维护,将ip地址映射到 hosts 文件中

1
2
3
10.250.0.7 worker1
10.250.0.174 worker2
10.250.0.108 worker3
  • 编辑 config/server.properties 文件,需要修改如下配置项

1
2
3
4
5
6
7
8
9
10
11
12
#broker 的全局唯⼀编号,不能重复,只能是数字。
broker.id=1 # 这里分别设置为1、2、3
#服务监听地址
listeners=PLAINTEXT://worker1:9092
#数据⽂件地址。同样默认是给的/tmp⽬录。
log.dirs=/usr/local/kafka/dataDir/kafka-logs
#默认的每个Topic的分区数,创建Topic时,如果未指定分区数,则默认为1个分区。
num.partitions=1
# 每个⽇志⽂件删除之前保存的时间,默认是168小时,即7天。
log.retention.hours=168
#zookeeper的服务地址,如果是自建的 Zookeeper 集群,则这里需要填写集群的连接地址
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  • 分别在三个节点上启动 Kafka

1
kafka-server-start.sh -daemon /usr/local/kafka/kafka3/config/server.properties
  • 测试

1
2
3
4
5
6
7
8
9
10
11
12
# 创建 topic
kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 3 --partitions 3 --topic disTopic
## 参数说明
# --replication-factor 3 表示创建的副本数
# --partitions 3 表示创建的分区数

# 查看 topic 详情
kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic
Topic: disTopic TopicId: VUK7Mc9oQdS1mjGG7OhQzQ PartitionCount: 3 ReplicationFactor: Configs:
Topic: disTopic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: N/A LastKnownElr: N/A
Topic: disTopic Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: N/A LastKnownElr: N/A
Topic: disTopic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: N/A LastKnownElr: N/A
  • 总体信息(Topic 概览)

1
Topic: disTopic	TopicId: VUK7Mc9oQdS1mjGG7OhQzQ	PartitionCount: 3	ReplicationFactor: Configs:
字段 含义
Topic: disTopic Topic 名称,即当前描述的主题。
TopicId: VUK7Mc9oQdS1mjGG7OhQzQ Kafka 内部自动生成的唯一标识符(UUID),Kafka 3.x 之后引入,用于区分同名但不同生命周期的 topic。
PartitionCount: 3 该主题有 3 个分区(partition)。每个分区存储一部分消息。
ReplicationFactor: 副本因子。这里虽然输出中没显示具体值,但可从每行分区配置推断是 3(每个分区有 3 个副本)。
Configs: topic 的配置项(例如清理策略、压缩类型等),如果为空,说明使用默认配置。
  • 分区详情(每个 Partition 一行)

1
Topic: disTopic	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1	Elr: N/A	LastKnownElr: N/A
字段 含义
Partition: 0 第 0 号分区。
Leader: 2 该分区当前的 Leader Broker 是 broker ID = 2,只有 Leader 才处理读写请求。
Replicas: 2,3,1 该分区的所有副本存放在哪些 Broker 上(即副本分布),分别是 broker 2、3、1。
Isr (In-Sync Replicas): 2,3,1 当前与 Leader 保持同步的副本集合。这里所有副本都在同步中(健康状态 👍)。
Elr / LastKnownElr Kafka 新版本中引入的 “Enhanced Leader Replica” 状态,目前未启用(N/A)。

Kafka 命令

  • 管理 topic

1
2
3
4
5
6
7
8
# 创建 topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
# 列出 topic
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 topic 详情
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
# 删除 topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test