部署
部署
一、部署模式
1.1 会话模式
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话
,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源
。 会话模式比较适合于单个规模小、执行时间短的大量作业
。
1.2 单作业模式
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群
,这就是所谓的单作业(Per-Job)模式
。 作业完成后,集群就会关闭,所有资源也会释放。 这些特性单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式
。使得 需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群
,比如YARN、Kubernetes(K8S)。
1.3 应用模式
前面提到的两种模式下,应用代码都是在客户端上执行
,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽
,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗
。 所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行
。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了
,这就是所谓的应用模式
。 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的。
二、运行模式-Standalone
2.1 Standalone(单机)
安装
从官方下载 并解压
[root@node01 opt]# tar -xzf flink-bin-scala_2.11.tgz
一些重要选项
/opt/module/flink/conf/flink-conf.yaml
jobmanager.rpc.address: localhost
配置项指向 master 节点。jobmanager.memory.process.size: 1600m
每个 JobManager 的可用内存值taskmanager.memory.process.size: 1728m
每个 TaskManager 的可用内存值,数据处理的时候状态数据会放这里。如果大数据量需要增加内存conf/workers
编辑文件该文件并输入每个 worker 节点的 IP 或主机名。taskmanager.numberOfTaskSlots: 1
每台机器的可用 CPU 数parallelism.default: 1
默认并行度,上面一个是当前slot最大能执行的数量,这个是真正执行时候的数量。
2.2 Standalone(Session-Cluster)
规划
组件 | node01 | node02 | node03 |
---|---|---|---|
JobManager | JobManager | ||
TaskManager | TaskManager | TaskManager | TaskManager |
配置
/opt/flink/conf/flink-conf.yaml
文件修改jobmanager.rpc.address
# JobManager节点
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0
rest.address: node01
rest.bind-address: 0.0.0.0
# TaskManager 节点地址,需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node01
/opt/flink/conf/masters
配置当前的jobmanger
[hadoop@node01 opt]$ cat /opt/flink/conf/masters
node01:8081
/opt/flink/conf/workers
把两台task机器加上
[hadoop@node01 opt]$ cat /opt/flink/conf/workers
node01
node02
node03
- 把flink分发到另外几台机器
[root@node01 opt]# xsync flink/
- 分别修改node02,node03的
/opt/flink/conf/flink-conf.yaml
文件中的TaskManager的host
taskmanager.host: node02
taskmanager.host: node03
- 启动
[hadoop@node01 opt]$ /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
[root@node01 flink]# xcall jps
================current host is node01=================
--> excute command "jps"
21441 StandaloneSessionClusterEntrypoint
21531 Jps
================current host is node02=================
--> excute command "jps"
15457 TaskManagerRunner
15529 Jps
================current host is node03=================
--> excute command "jps"
16054 TaskManagerRunner
16125 Jps
excute successfully !
访问http://node01:8081/#/overview
可以对 flink 集群和任务进行监控管理。
- 停止
/opt/flink/bin/stop-cluster.sh
提交任务
# 启动Job
[root@node01 ~]# /opt/flink/bin/flink run -m node01:8081 -c com.bihell.wc.StreamWordCount -p 2 flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
Job has been submitted with JobID d44bcf6116cbf6b3a8eacfd8e49aa0dc
# 列出当前的Job
[root@node01 flink]# bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.03.2022 14:49:45 : d44bcf6116cbf6b3a8eacfd8e49aa0dc : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
# 关job
[root@node01 flink]# bin/flink cancel d44bcf6116cbf6b3a8eacfd8e49aa0dc
Cancelling job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
Cancelled job d44bcf6116cbf6b3a8eacfd8e49aa0dc.
2.3 Standalone(Application)
应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager。
- 进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/目录下。
- 执行以下命令,启动 JobManager。
$ ./bin/standalone-job.sh start --job-classname com.bihell.wc.StreamWordCount
这里我们直接指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包。
- 同样是使用 bin 目录下的脚本,启动 TaskManager。
$ ./bin/taskmanager.sh start
- 如果希望停掉集群,同样可以使用脚本,命令如下。
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop
三、运行模式-Flink on Yarn
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源
。
3.1 相关准备和配置
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
- 配置环境变量,增加环境变量配置如下:
[hadoop@node01 ~]$ sudo vim /etc/profile.d/my_env.sh
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
2.启动Hadoop集群,包括HDFS和YARN。
3.2 Flink on Yarn(Session-Cluster)
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。
在 yarn 中初始化一个 flink 集群,开辟指定的资源,以后提交任务都向这里提 交。这个 flink 集群会常驻在 yarn 集群中,除非手工停止。
- 执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群
/opt/flink/bin/yarn-session.sh -nm test -d
-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。 -jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。 -nm(--name):配置在YARN UI界面上显示的任务名。 -qu(--queue):指定YARN队列名。 -tm(--taskManager):配置每个TaskManager所使用内存。
- 提交作业
# 跟Standalone有些区别不用特意指定-m,会自动有限找Yarn
/opt/flink/bin/flink run -c com.bihell.wc.StreamWordCount -p 2 -d flink-1.14-1.0-SNAPSHOT.jar --host node01 --port 7777
3.3 Flink on Yarn(Per-Job-Cluster)
一个 Job 会对应一个集群,每提交一个作业会根据自身的情况,都会单独向 yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常 提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请;适合规模大 长时间运行的作业。
每次提交都会创建一个新的 flink 集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
命令行查看或取消作业
bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
3.4 Flink on Yarn(Application))
应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。
- 执行命令提交作业。
$ bin/flink run-application -t yarn-application -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
- 在命令行中查看或取消作业。
$ bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
- 也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://node01:8020/flink-dist" -c com.atguigu.wc.SocketStreamWordCount hdfs://node01:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar
这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更 加轻量了。 Application Mode
四、运行模式-Flink on Kubernetes
4.1 Flink on Kubernetes(Session)
# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
# (2) Submit example job
$ ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
4.2 Flink on Kubernetes(Application)
- 先通过官方的基础镜像做一个自己的镜像
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
- 发布镜像以后通过以下命令执行程序
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
You can override configurations set in conf/flink-conf.yaml by passing key-value pairs -Dkey=value to bin/flink.
- 交互操作
# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>
五、历史服务器
运行 Flink job 的集群一旦停止,只能去 yarn 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web UI,很难清楚知道作业在挂的那一刻到底发生了什么。如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的 Web UI,我们可以通过 UI 发现和定位一些问题。 Flink提供了历史服务器,用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出。 此外,它对外提供了 REST API,它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后,JobManager 会将已经完成任务的统计信息进行存档,History Server 进程则在任务停止后可以对任务统计信息进行查询。比如:最后一次的 Checkpoint、任务运行时的相关配置。
- 创建存储目录
hadoop fs -mkdir -p /logs/flink-job
- 在 flink-config.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://node01:8020/logs/flink-job
historyserver.web.address: node01
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node01:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 5000
- 启动历史服务器
bin/historyserver.sh start
- 停止历史服务器
bin/historyserver.sh stop
- 在浏览器地址栏输入:http://node01:8082 查看已经停止的 job 的统计信息
六、Q&A
6.1 k8s权限问题
Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
处理
创建一个yaml文档来绑定账号
# NOTE: The service account `default:default` already exists in k8s cluster.
# You can create a new account following like this:
#---
#apiVersion: v1
#kind: ServiceAccount
#metadata:
# name: <new-account-name>
# namespace: <namespace>
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fabric8-rbac
subjects:
- kind: ServiceAccount
# Reference to upper's `metadata.name`
name: default
# Reference to upper's `metadata.namespace`
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
# 应用
kubectl apply -f fabric8-rbac.yaml
# 删除
kubectl delete -f fabric8-rbac.yaml
kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services
6.2 以批处理执行
DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只 要维护一套 DataStream API 就可以了。如果想要批处理,官方推荐的做法 是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar