Flink
Flink 是一个开源的分布式流处理框架,专注于大规模数据流的实时处理。它提供了高吞吐量、低延迟的处理能力,支持有状态和无状态的数据流操作。Flink 可以处理事件时间、窗口化、流与批处理混合等复杂场景,广泛应用于实时数据分析、实时监控、机器学习等领域。其强大的容错机制和高可扩展性,使其成为大数据领域中的重要技术之一。
Standalone Cluster(独立集群):在独立集群模式下,JobManager和TaskManager都运行在独立的Java进程中。这种模式仅用于开发环境,生产环境使用Flink on YARN或者Flink on K8S的方式
文档使用以下3台服务器,具体服务分配见描述的进程
| IP地址 | 主机名 | 描述 |
|---|---|---|
| 192.168.1.131 | bigdata01 | StandaloneSessionClusterEntrypoint、TaskManagerRunner、HistoryServer |
| 192.168.1.132 | bigdata02 | StandaloneSessionClusterEntrypoint、TaskManagerRunner |
| 192.168.1.133 | bigdata03 | StandaloneSessionClusterEntrypoint、TaskManagerRunner |
基础配置
下载软件包
wget https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz解压软件包
tar -zxvf flink-1.19.1-bin-scala_2.12.tgz -C /usr/local/software/
ln -s /usr/local/software/flink-1.19.1 /usr/local/software/flink2
配置环境变量
cat >> ~/.bash_profile <<"EOF"
## FLINK_HOME
export FLINK_HOME=/usr/local/software/flink
export PATH=$PATH:$FLINK_HOME/bin
EOF
source ~/.bash_profile2
3
4
5
6
查看版本
$ flink --version
Version: 1.19.1, Commit ID: 5edb5a92
服务配置
配置config.yaml
修改以下配置
- jobmanager.memory.process.size: jobmanager应用的内存大小,可以适当分配
- taskmanager.memory.process.size: taskmanager应用的内存大小,运行任务所有的服务,可以多给
- taskmanager.numberOfTaskSlots: 控制每个 TaskManager 上的任务槽数量,影响并行度和资源调度,可以设置为CPU的数量。
- high-availability:HA的相关配置
- 其他配置根据实际需求修改
cp $FLINK_HOME/conf/config.yaml{,_bak}
cat > $FLINK_HOME/conf/config.yaml <<"EOF"
# jobmanager
jobmanager:
bind-host: 0.0.0.0
rpc:
address: bigdata01
port: 6123
memory:
process:
size: 1g
execution:
failover-strategy: region
# taskmanager
taskmanager:
bind-host: bigdata01
host: bigdata01
memory:
process:
size: 4g
numberOfTaskSlots: 8 # CPU核心数量
# web
rest:
port: 8082
address: bigdata01
bind-address: 0.0.0.0
web:
submit:
enable: true
cancel:
enable: true
upload:
dir: /data/service/flink/upload
exception-history-size: 100
# historyserver
historyserver:
archive:
fs:
dir: hdfs://atengcluster/tmp/flink/logs
refresh-interval: 10000
clean-expired-jobs: true
web:
address: bigdata01
port: 8083
# 参数优化
parallelism:
default: 1
classloader:
resolve:
order: parent-first
process:
working-dir: /data/service/flink/working-dir
# 配置checkpoint和savepoint
execution:
checkpointing:
interval: 10s
externalized-checkpoint-retention: DELETE_ON_CANCELLATION
max-concurrent-checkpoints: 1
mode: EXACTLY_ONCE
state:
backend: rocksdb
incremental: true
checkpoints:
dir: hdfs://atengcluster/flink/checkpoints
savepoints:
dir: hdfs://atengcluster/flink/savepoints
# HA配置
high-availability:
type: zookeeper
storageDir: hdfs://atengcluster/flink/recovery
zookeeper:
quorum: bigdata01:2181,bigdata02:2181,bigdata03:2181
client:
acl: open
EOF2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
配置masters
cat > $FLINK_HOME/conf/masters <<EOF
bigdata01:8082
bigdata02:8082
bigdata03:8082
EOF2
3
4
5
配置workers
cat > $FLINK_HOME/conf/workers <<EOF
bigdata01
bigdata02
bigdata03
EOF2
3
4
5
创建日志目录
hadoop fs -mkdir -p /tmp/flink/logs分发配置文件
scp $FLINK_HOME/conf/{config.yaml,masters,workers} bigdata02:$FLINK_HOME/conf
scp $FLINK_HOME/conf/{config.yaml,masters,workers} bigdata03:$FLINK_HOME/conf2
其他节点修改配置文件
修改节点的host
$ vi $FLINK_HOME/conf/config.yaml
# ...
jobmanager:
rpc:
address: bigdata01
taskmanager:
bind-host: bigdata01
host: bigdata01
rest:
address: bigdata01
bind-address: bigdata01
historyserver:
web:
address: bigdata01
# ...2
3
4
5
6
7
8
9
10
11
12
13
14
15
启动服务
启动flink
bigdata01: StandaloneSessionClusterEntrypoint、TaskManagerRunner
bigdata02: StandaloneSessionClusterEntrypoint、TaskManagerRunner
bigdata03: StandaloneSessionClusterEntrypoint、TaskManagerRunner
Flink Web: http://bigdata01:8082/
[admin@bigdata01 ~]$ $FLINK_HOME/bin/start-cluster.sh启动history
bigdata01: HistoryServer
Flink History Server Web: http://bigdata01:8083/
[admin@bigdata01 ~]$ $FLINK_HOME/bin/historyserver.sh start停止服务
[admin@bigdata01 ~]$ $FLINK_HOME/bin/historyserver.sh stop
[admin@bigdata01 ~]$ $FLINK_HOME/bin/stop-cluster.sh2
设置自启
创建hadoop环境变量
mkdir -p /data/service/flink/config
cat > /data/service/flink/config/env.conf <<EOF
JAVA_HOME=/usr/local/software/jdk8
FLINK_HOME=/usr/local/software/flink
HADOOP_HOME=/usr/local/software/hadoop
HADOOP_CLASSPATH=$(hadoop classpath)
EOF2
3
4
5
6
7
Flink JobManager 服务
在bigdata01、bigdata02、bigdata03节点设置自启
创建配置文件
sudo tee /etc/systemd/system/flink-jobmanager.service <<"EOF"
[Unit]
Description=Flink StandaloneSessionClusterEntrypoint
Documentation=https://flink.apache.org
After=network.target
[Service]
Type=simple
EnvironmentFile=/data/service/flink/config/env.conf
ExecStart=/usr/local/software/flink/bin/jobmanager.sh start-foreground
ExecStop=/usr/local/software/flink/bin/jobmanager.sh stop
Restart=on-failure
RestartSec=30
TimeoutStartSec=120
TimeoutStopSec=180
StartLimitIntervalSec=600
StartLimitBurst=3
KillMode=control-group
KillSignal=SIGTERM
SuccessExitStatus=143
User=admin
Group=ateng
[Install]
WantedBy=multi-user.target
EOF2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动服务
sudo systemctl daemon-reload
sudo systemctl enable flink-jobmanager.service
sudo systemctl start flink-jobmanager.service
sudo systemctl status flink-jobmanager.service2
3
4
Flink TaskManager 服务
在bigdata01、bigdata02、bigdata03节点设置自启
创建配置文件
sudo tee /etc/systemd/system/flink-taskmanager.service <<"EOF"
[Unit]
Description=Flink TaskManagerRunner
Documentation=https://flink.apache.org
After=network.target
[Service]
Type=simple
EnvironmentFile=/data/service/flink/config/env.conf
ExecStart=/usr/local/software/flink/bin/taskmanager.sh start-foreground
ExecStop=/usr/local/software/flink/bin/taskmanager.sh stop
Restart=on-failure
RestartSec=30
TimeoutStartSec=120
TimeoutStopSec=180
StartLimitIntervalSec=600
StartLimitBurst=3
KillMode=control-group
KillSignal=SIGTERM
SuccessExitStatus=143
User=admin
Group=ateng
[Install]
WantedBy=multi-user.target
EOF2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动服务
sudo systemctl daemon-reload
sudo systemctl enable flink-taskmanager.service
sudo systemctl start flink-taskmanager.service
sudo systemctl status flink-taskmanager.service2
3
4
Flink Hisotry Server服务
在bigdata01节点设置自启
创建配置文件
sudo tee /etc/systemd/system/flink-historyserver.service <<"EOF"
[Unit]
Description=Flink Hisotry Server
Documentation=https://flink.apache.org
After=network.target
[Service]
Type=simple
EnvironmentFile=/data/service/flink/config/env.conf
ExecStart=/usr/local/software/flink/bin/historyserver.sh start-foreground
ExecStop=/usr/local/software/flink/bin/historyserver.sh stop
Restart=on-failure
RestartSec=30
TimeoutStartSec=120
TimeoutStopSec=180
StartLimitIntervalSec=600
StartLimitBurst=3
KillMode=control-group
KillSignal=SIGTERM
SuccessExitStatus=143
User=admin
Group=ateng
[Install]
WantedBy=multi-user.target
EOF2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
启动服务
sudo systemctl daemon-reload
sudo systemctl enable flink-historyserver.service
sudo systemctl start flink-historyserver.service
sudo systemctl status flink-historyserver.service2
3
4
使用服务
访问Web
URL: http://bigdata01:8082提交作业到集群
批处理任务
flink run \
-m bigdata01:8082 \
$FLINK_HOME/examples/batch/WordCount.jar2
3
流处理任务
flink run -d \
-m bigdata01:8082 \
$FLINK_HOME/examples/streaming/TopSpeedWindowing.jar2
3
查看作业
flink list -m bigdata01:8082取消作业
flink cancel -m bigdata01:8082 33a1bd6edb65057694a91ddaf069e8b3