Kafka 允许外网访问

摘要

  • 本文介绍 Kafka 的 通信协议,以及如何开启外网访问。

  • Kafka官网

  • 本文使用的 Kafka 版本为 3.9.1。Kafka 团队宣布 3.9 会是 最后一个还带有被弃用的 ZooKeeper 模式 的主要版本。以后版本(如 4.0)将完全弃用 ZooKeeper。

Kafka 的 通信协议

  • Kafka 主要支持四种安全协议

协议名称 加密 认证 说明 推荐场景 理由
PLAINTEXT ❌ 否 ❌ 否 无加密、无认证(默认最简单) 开发 / 测试环境、内网集群通信 简单、易调试;网络可信,性能优先
SSL ✅ 是 ✅ 可选 使用 TLS/SSL 加密通信,可配置客户端证书认证 外网客户端访问 支持数据加密,可选认证,保证安全
SASL_PLAINTEXT ❌ 否 ✅ 是 使用 SASL(用户名密码)认证,但不加密数据 需要用户认证但局域网环境 有认证,但不加密,性能开销低
SASL_SSL ✅ 是 ✅ 是 同时支持 SASL 认证和 SSL 加密(最安全) 外网客户端访问 既有认证又加密,安全性最高
  • config/server.properties 文件中 可以看到如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 套接字服务器监听的地址。
# 如果未配置,则主机名默认等于 `java.net.InetAddress.getCanonicalHostName()` 的返回值,
# 使用监听器名称 `PLAINTEXT`,端口号为 9092。
# 格式:
# listeners = listener_name://host_name:port
# 示例:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Broker 向客户端“通告”的监听器名称、主机名和端口。
# 客户端实际会连接这个地址,而不是直接使用 listeners 的地址。
# 如果未设置,则默认使用 `listeners` 的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092

# 将监听器名称映射到安全协议类型。
# 默认情况下,监听器名称与安全协议同名。
# 例如:PLAINTEXT→PLAINTEXT、SSL→SSL、SASL_PLAINTEXT→SASL_PLAINTEXT、SASL_SSL→SASL_SSL。
# 更多细节可参考 Kafka 官方配置文档。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
配置项 作用 说明值
listeners Kafka 实际监听的地址(Broker 对外开放的端口) PLAINTEXT://:9092这里 PLAINTEXT 是监听器名称,并不是协议名称,实际上可以配置为任何值,具体协议是通过 listener.security.protocol.map 配置的映射关系来确定。
advertised.listeners Kafka 告诉客户端应该用哪个地址连接(客户端最终连的) 默认使用 listeners 的值
listener.security.protocol.map 映射监听器名称到通信安全协议(如明文、SSL、SASL 等) PLAINTEXT:PLAINTEXT,前面是监听器名称,后面是协议名称

仅需内网访问

1
2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://worker1:9092 # 这里是内网ip

允许外网访问

1
2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://161.189.227.200:9092 # 这里是外网ip

内外网都要访问(推荐双通道方式)

1
2
3
4
5
6
7
8
# 这里 INTERNAL 和 EXTERNAL 分别是自定义的监听器名称,此时内网端口为 9092,外网端口为 9093
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# 告诉客户端应该用哪个地址连接
advertised.listeners=INTERNAL://worker1:9092,EXTERNAL://161.189.227.200:9093
# 映射监听器名称到通信安全协议的映射关系
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# 集群间通信仍使用内网
inter.broker.listener.name=INTERNAL

开启 SASL_PLAINTEXT

  • 这里设置外网访问时开启 SASL_PLAINTEXT

1
2
3
4
5
6
7
8
9
10
11
12
13
# 监听地址和端口,这里内网和外网分开配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094
# 客户端建立连接后实际返回给客户端的地址
advertised.listeners=INTERNAL://worker1:9092,EXTERNAL://161.189.227.200:9094
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
# 集群间通信 still use INTERNAL
inter.broker.listener.name=INTERNAL

# 认证机制(常见为 PLAIN,也可以是 SCRAM-SHA-256/512)
# client 连接时
sasl.enabled.mechanisms=PLAIN
# broker 之间连接时,因为 inter.broker.listener.name=INTERNAL,所以 INTERNAL:SASL_PLAINTEXT 才有效
#sasl.mechanism.inter.broker.protocol=PLAIN
  • 创建 kafka_jaas.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
############################
# Kafka Broker (服务端)
############################
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
# Broker 自己的身份(用于 broker 之间通信,本示例中没有使用)
username="admin"
password="admin-secret"

# 客户端可用账号,即 user_xxx,这里 xxx 为用户名,= 右边的为密码
user_admin="admin-secret"
user_alice="alice-secret"
user_bob="bob-secret";
};
  • 启动 kafka

1
2
3
# 在启动 Kafka Broker 前,设置环境变量指向 JAAS 文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_jaas.conf"
kafka-server-start.sh config/server.properties

客户端访问

  • 创建 client.conf

1
2
3
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
  • 命令行访问

1
2
3
4
5
6
7
8
# 创建topic
kafka-topics.sh --create --topic test-topic --bootstrap-server=161.189.227.200:9094 --command-config=client.conf
# 查看topic
kafka-topics.sh --list --bootstrap-server=161.189.227.200:9094 --command-config=client.conf
# 创建消费者,--group 指定消费者组名称
kafka-console-consumer.sh --bootstrap-server=161.189.227.200:9094 --topic test-topic --consumer.config=client.conf --group=test-group
# 创建生产者
kafka-console-producer.sh --bootstrap-server=161.189.227.200:9094 --topic test-topic --producer.config=client.conf

开启 SASL_SSL

  • 这里设置外网访问时开启 SASL_SSL

创建证书

  • 生成 Broker keystore,用于 存储 broker 的私钥和证书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
keytool -keystore kafka.server.keystore.jks \
-alias broker -validity 365 \
-genkey -keyalg RSA \
-dname "CN=broker, OU=Kafka, O=YourOrg, L=City, ST=State, C=CN" \
-storepass 123456 \
-keypass 123456

## 参数说明:
# -keystore:生成的 keystore 文件路径
# -alias broker:证书别名
# -validity 365:有效期 365 天
# -dname:证书信息
# -storepass:keystore 密码
# -keypass:密钥密码
  • 导出 Broker 证书(用于客户端 truststore),生成 kafka.server.crt,客户端会用它来验证 broker。

1
2
3
keytool -keystore kafka.server.keystore.jks \
-alias broker -export -file kafka.server.crt \
-storepass 123456
  • 生成 Broker truststore,truststore 用于 存储信任的证书(这里把自己生成的证书导入进去即可),生成 kafka.truststore.jks

1
2
3
keytool -keystore kafka.truststore.jks \
-alias broker -import -file kafka.server.crt \
-storepass 123456 -noprompt

server.properties 配置 SASL_SSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 监听地址和端口,这里内网和外网分开配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9095
# 客户端建立连接后实际返回给客户端的地址
advertised.listeners=INTERNAL://worker2:9092,EXTERNAL://161.189.227.200:9095
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SASL_SSL
inter.broker.listener.name=INTERNAL

# SASL
# 认证机制(常见为 PLAIN,也可以是 SCRAM-SHA-256/512)
# client 连接时
sasl.enabled.mechanisms=PLAIN
# broker 之间连接时,需要 inter.broker.listener.name=SASL_PLAINTEXT 才有效
#sasl.mechanism.inter.broker.protocol=PLAIN


# SSL
ssl.keystore.location=/usr/local/kafka/kafka3/config/ssl/kafka.server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/usr/local/kafka/kafka3/config/ssl/kafka.truststore.jks
ssl.truststore.password=123456
# 如果不要求客户端证书,可以设置 none ,要求则设置为 required
ssl.client.auth=none
  • 启动 kafka 前同样需要先创建好 kafka_jaas.conf,与 SASL_PLAINTEXT 一样。

客户端访问

  • kafka.truststore.jks 拷贝到客户端

  • 与 SASL_PLAINTEXT 一样,创建 client.conf,并添加如下信息

1
2
3
4
5
6
7
8
9
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";

# SSL 配置
ssl.truststore.location=/Users/hanqf/develop_soft/kafka/kafka3/config/ssl/kafka.truststore.jks
ssl.truststore.password=123456
# 禁用主机名验证,否则会校验证书的 SAN
ssl.endpoint.identification.algorithm=
  • 命令行访问 与 SASL_PLAINTEXT 一样,这里不再赘述

jks 证书转换为 pem 格式

  • 导出 证书 (.crt)
1
2
3
4
5
keytool -exportcert -alias broker \
-keystore kafka.server.keystore.jks \
-rfc -file server.crt \
-storepass 123456

  • 导出私钥 (.key)
1
2
3
4
5
6
7
8
9
keytool -importkeystore \
-srckeystore kafka.server.keystore.jks \
-srcalias broker \
-destkeystore temp.p12 \
-deststoretype PKCS12 \
-srcstorepass 123456 \
-deststorepass 123456

openssl pkcs12 -in temp.p12 -nocerts -out server.key -nodes -legacy