Kafka3
Kafka with KRaft 引入了一种新的分布式协调协议,称为 KRaft,以替代传统的 ZooKeeper。KRaft 提供了更简化的架构,将 Kafka 的元数据存储在一组专用的 Kafka brokers 中,而不是依赖外部的 ZooKeeper 集群。
文档使用以下3台服务器,具体服务分配见描述的进程
| IP地址 | 主机名 | 描述 |
|---|---|---|
| 192.168.1.131 | bigdata01 | Controller & Broker |
| 192.168.1.132 | bigdata02 | Controller & Broker |
| 192.168.1.133 | bigdata03 | Controller & Broker |
基础配置
安装服务
下载软件包
wget https://dlcdn.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz1
解压软件包
tar -zxvf kafka_2.13-3.8.1.tgz -C /usr/local/software/
ln -s /usr/local/software/kafka_2.13-3.8.1 /usr/local/software/kafka1
2
2
配置环境变量
cat >> ~/.bash_profile <<"EOF"
## KAFKA_HOME
export KAFKA_HOME=/usr/local/software/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source ~/.bash_profile1
2
3
4
5
6
2
3
4
5
6
集群配置
配置server.properties
在bigdata01节点编辑配置文件,然后分发到其他节点上
注意修改以下配置:
- controller.quorum.voters:Controller 集群成员
- listeners:定义 Kafka 服务监听的地址和端口
- advertised.listeners:定义对外通告的监听地址,通告地址会被客户端和其他 Broker 用来连接到此节点
- 其他配置安装实际需求修改
cp $KAFKA_HOME/config/kraft/server.properties{,_bak}
cat > $KAFKA_HOME/config/kraft/server.properties <<"EOF"
process.roles=controller,broker
node.id=1
controller.quorum.voters=1@bigdata01:9093,2@bigdata02:9093,3@bigdata03:9093
inter.broker.listener.name=INTERNAL
controller.listener.names=CONTROLLER
listeners=CLIENT://bigdata01:9092,CONTROLLER://bigdata01:9093,INTERNAL://bigdata01:9094,EXTERNAL://bigdata01:9095
advertised.listeners=CLIENT://bigdata01:9092,INTERNAL://bigdata01:9094,EXTERNAL://14.104.200.4:19092
listener.security.protocol.map=CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
log.dirs=/data/service/kafka/data/01,/data/service/kafka/data/02
num.io.threads=8
num.network.threads=3
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
default.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
auto.create.topics.enable=true
delete.topic.enable=true
log.retention.hours=72
log.retention.bytes=1073741824
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
max.partition.fetch.bytes=104857600
max.request.size=104857600
message.max.bytes=104857600
fetch.max.bytes=104857600
replica.fetch.max.bytes=104857600
EOF1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
分发配置文件
分发配置文件
scp $KAFKA_HOME/config/kraft/server.properties bigdata02:$KAFKA_HOME/config/kraft/
scp $KAFKA_HOME/config/kraft/server.properties bigdata03:$KAFKA_HOME/config/kraft/1
2
2
修改其他节点配置
修改node.id
所有server的id在集群中必须保持唯一
[admin@bigdata02 ~]$ sed -i "s#node.id=.*#node.id=2#" $KAFKA_HOME/config/kraft/server.properties
[admin@bigdata03 ~]$ sed -i "s#node.id=.*#node.id=3#" $KAFKA_HOME/config/kraft/server.properties1
2
2
修改listeners
[admin@bigdata02 ~]$ sed -i "s#listeners=.*#listeners=CLIENT://bigdata02:9092,CONTROLLER://bigdata02:9093,INTERNAL://bigdata02:9094,EXTERNAL://bigdata02:9095#" $KAFKA_HOME/config/kraft/server.properties
[admin@bigdata03 ~]$ sed -i "s#listeners=.*#listeners=CLIENT://bigdata03:9092,CONTROLLER://bigdata03:9093,INTERNAL://bigdata03:9094,EXTERNAL://bigdata03:9095#" $KAFKA_HOME/config/kraft/server.properties1
2
3
2
3
修改advertised.listeners
[admin@bigdata02 ~]$ sed -i "s#advertised.listeners=.*#advertised.listeners=CLIENT://bigdata02:9092,INTERNAL://bigdata02:9094,EXTERNAL://14.104.200.5:19092#" $KAFKA_HOME/config/kraft/server.properties
[admin@bigdata03 ~]$ sed -i "s#advertised.listeners=.*#advertised.listeners=CLIENT://bigdata03:9092,INTERNAL://bigdata03:9094,EXTERNAL://14.104.200.6:19092#" $KAFKA_HOME/config/kraft/server.properties1
2
3
2
3
设置日志目录格式
生成集群 UUID
在bigdata01节点生成UUID,用做集群的ID
kafka-storage.sh random-uuid1
格式化目录
使用以上生成的UUID,在所有节点格式化目录
kafka-storage.sh format -t NonEOQviS621cjG0UXb7NQ -c $KAFKA_HOME/config/kraft/server.properties1
启动集群
启动服务
bigdata01: Controller、Broker
bigdata02: Controller、Broker
bigdata03: Controller、Broker
Controller Address: bigdata01:9093
export LOG_DIR=/data/service/kafka/logs
export KAFKA_HEAP_OPTS="-Xms1g -Xmx8g"
export JAVA_HOME=/usr/local/software/jdk8
kafka-server-start.sh -daemon $KAFKA_HOME/config/kraft/server.properties1
2
3
4
2
3
4
关闭服务
kafka-server-stop.sh1
设置服务自启
Kafka 服务
编辑配置文件
bigdata01、bigdata02、bigdata03节点设置
sudo tee /etc/systemd/system/kafka.service <<"EOF"
[Unit]
Description=Kafka
Documentation=https://kafka.apache.org
After=network.target
[Service]
Type=simple
WorkingDirectory=/usr/local/software/kafka
Environment="JAVA_HOME=/usr/local/software/jdk8"
Environment="KAFKA_HOME=/usr/local/software/kafka"
Environment="LOG_DIR=/data/service/kafka/logs"
Environment="KAFKA_HEAP_OPTS=-Xms1g -Xmx8g"
ExecStart=/usr/local/software/kafka/bin/kafka-server-start.sh /usr/local/software/kafka/config/kraft/server.properties
ExecStop=/bin/kill -SIGTERM $MAINPID
Restart=on-failure
RestartSec=30
TimeoutStartSec=120
TimeoutStopSec=180
StartLimitIntervalSec=600
StartLimitBurst=3
KillMode=control-group
KillSignal=SIGTERM
SuccessExitStatus=143
User=admin
Group=ateng
[Install]
WantedBy=multi-user.target
EOF1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
启动服务
sudo systemctl daemon-reload
sudo systemctl enable kafka.service
sudo systemctl start kafka.service
sudo systemctl status kafka.service1
2
3
4
2
3
4
使用服务
创建topic
kafka-topics.sh --create \
--topic quickstart-events \
--partitions 3 --replication-factor 3 \
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:90921
2
3
4
2
3
4
查看topic
kafka-topics.sh --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 --list
kafka-topics.sh --describe --topic quickstart-events --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:90921
2
2
写入消息
$ kafka-console-producer.sh --topic quickstart-events --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
This is my first event
This is my second event1
2
3
2
3
读取消息
$ kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
This is my first event
This is my second event1
2
3
2
3
删除topic
kafka-topics.sh \
--delete --topic quickstart-events \
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:90921
2
3
2
3