(4条消息) Flink on Yarn(HA配置)

根据部署方式不同,Flink Jobmanager HA配置分为2种:

  1. 1、standalone cluster HA
  2. 2、Yarn cluster HA

其中,standalone cluster HA可参考我之前的一篇文章

简单回顾下,standalone模式的HA需要多个“活着的”Jobmanager,其中1个作为leader,其他作为standby,leader选举依赖于Zookeeper。可以用下面的一张图来形象的表述standalone HA: 

本文专门讨论Yarn下Flink HA的搭建与配置。

一、Flink On Yarn 简介

Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:

  1. 用于启动JobManager(AM)的容器
  2. 用于启动TaskManager的容器

我们可以通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息: 

其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量。

在配置HA前,先通过-q看一下我的yarn集群的资源情况:

从图中可以看出,我配置的每个NodeManager的内存是8192MB(yarn-site.xml),每个NodeManager的vcores数量是8。所以,当前yarn集群中可用内存总量为32768,总cores是32.

二、Flink on Yarn HA 配置

1、配置准备 
在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过$HADOOP_HOME/sbin/start-all.sh启动hdfs和yarn。

2、配置AM在尝试重启的最大次数(yarn-site.xml)

此配置需要在$HADOOP_CONF_DIR 的yarn-site.xml添加。 
添加如下配置: 

此配置代表application master在重启时,尝试的最大次数。

3、配置Application Attempts(flink-conf.yaml)

此参数需要在$FLINK_HOME/conf 的flink-conf.yaml中配置。 
添加如下配置:

此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。

注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.

4、配置zookeeper信息 
虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息:

其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。 
在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。

完整的Flink配置信息如下:

  1. #==============================================================================
  2. # Common
  3. #==============================================================================
  4. env.java.home: /home/flink/java/jdk1.8.0_60
  5. jobmanager.rpc.port: 6123
  6. jobmanager.heap.mb: 6192
  7. taskmanager.heap.mb: 8192
  8. taskmanager.numberOfTaskSlots: 8
  9. taskmanager.memory.preallocate: false
  10. #==============================================================================
  11. # Web Frontend
  12. #==============================================================================
  13. jobmanager.web.port: 8081
  14. #==============================================================================
  15. # Streaming state checkpointing
  16. #==============================================================================
  17. state.backend: filesystem
  18. state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
  19. #==============================================================================
  20. # Advanced
  21. #==============================================================================
  22. taskmanager.network.numberOfBuffers: 64000
  23. fs.hdfs.hadoopconf: /home/flink/hadoop/hadoop-2.6.0/etc/hadoop
  24. #==============================================================================
  25. # Master High Availability (required configuration)
  26. #==============================================================================
  27. recovery.mode: zookeeper
  28. recovery.zookeeper.quorum: flink:2181,data0:2181,mf:2181
  29. recovery.zookeeper.storageDir: hdfs:///flink/recovery
  30. recovery.zookeeper.path.root: /flinkOnYarn
  31. recovery.zookeeper.path.namespace: /cluster_yarn
  32. #==============================================================================
  33. # Yarn
  34. #==============================================================================
  35. yarn.application-attempts: 4
  36. #==============================================================================
  37. # Yarn will overwrite following parameters
  38. # 1. jobmanager.rpc.address
  39. # 2. taskmanager.tmp.dirs
  40. # 3. parallelism.default
  41. #==============================================================================

三、启动Flink Yarn Session

启动Flink Yarn Session有2种模式:

  1. 分离模式
  2. 客户端模式

通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill <Application_ID>命令来停止。

我们这里采用分离模式来启动Flink Yarn Session:

yarn-session.sh -n 3 -jm 4096 -tm 8192 -s 8 -nm FlinkOnYarnSession -d -st

我们可以通过yarn的webUI查看一下当前启动的Application:

可以看到名字是FlinkOnYarnSession,总内存32GB,运行使用的内存28GB(-jm指定了4GB),当前容器数量为4.我们通过ApplicationMaster tracking一下Flink的WebUI:

四、提交Job

通过CLI方式提交:

flink run -c wikiedits.Test1 toptrade-flink-1.0.jar

我们看下目前Job的JobGraph:

五、HA测试

现在,我们kill掉Jobmanager(AM)进程YarnApplicationMasterRunner,看看Yarn Cluster的HA情况。

我们看到Application Attemp的ID增加了1:

我们再到mf42的$YARN_CONF_DIR(如果没设置则在$HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs/<Application_ID>/下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。

再次查看进程:

YarnApplicationMasterRunner进程号变了。

此时,Flink的WebUI又可以访问了,而且Job被cancel掉后重新启动了。

六、未来Flink1.2中的Flink On Yarn

增强了以下几点:

  1. 1、不用先启动Yarn Session再提交Job,而是直接提交Job到Yarn集群,因此client可以断开连接
  2. 2、用户代码库和配置文件直接在classpath下,而不是在动态类加载器中
  3. 3、容器在需要时分配,不需要时释放资源
  4. 4、按需分配的容器可以针对不同的operator分配不同的CPU和Core资源,通过配置文件实现

Flink1.2中ResourceManager提出了一个Dispatcher的概念,主要用于统一发布Job并监控实例的运行。但时可以选择是否使用Dispatcher。

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html#yarn-cluster-high-availability 
http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/#ibm-pcon 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 
https://www.youtube.com/watch?v=L21N8mNtvME 
http://www.jianshu.com/p/8a3177095072

(0)

相关推荐