Flink on YARN单机部署指南:从环境配置到任务提交全流程解析

一、部署环境准备与前提条件

1.1 硬件资源要求

单机部署需满足最低配置:4核CPU、16GB内存、50GB可用磁盘空间。推荐使用SSD存储以提升I/O性能,尤其对于状态后端(State Backend)为RocksDB的场景。需确保物理机或虚拟机资源独占,避免与其他服务竞争资源。

1.2 软件依赖清单

  • 操作系统:CentOS 7/8或Ubuntu 20.04+(需关闭SELinux)
  • Java环境:JDK 11(Flink 1.15+推荐版本)
  • Hadoop依赖:Hadoop 3.3.x(需包含YARN和HDFS组件)
  • Flink版本:1.17.0(最新稳定版,兼容YARN)

1.3 网络与权限配置

  • 配置主机名解析:在/etc/hosts中添加127.0.0.1 localhost<主机IP> <主机名>
  • 创建Hadoop用户组及用户:groupadd hadoop && useradd -g hadoop hadoop
  • 配置SSH免密登录:ssh-keygen -t rsa && cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

二、Flink与YARN集成配置

2.1 安装包准备

从Apache官网下载Flink二进制包(如flink-1.17.0-bin-scala_2.12.tgz),解压至/opt/flink目录。下载对应版本的Hadoop二进制包(如hadoop-3.3.4.tar.gz),解压至/opt/hadoop

2.2 环境变量配置

/etc/profile中添加:

  1. export HADOOP_HOME=/opt/hadoop
  2. export FLINK_HOME=/opt/flink
  3. export PATH=$PATH:$HADOOP_HOME/bin:$FLINK_HOME/bin
  4. export JAVA_HOME=/usr/lib/jvm/java-11-openjdk # 根据实际路径调整

执行source /etc/profile使配置生效。

2.3 YARN配置优化

修改$HADOOP_HOME/etc/hadoop/yarn-site.xml,重点调整以下参数:

  1. <property>
  2. <name>yarn.nodemanager.resource.memory-mb</name>
  3. <value>12288</value> <!-- 分配12GB内存给YARN -->
  4. </property>
  5. <property>
  6. <name>yarn.scheduler.maximum-allocation-mb</name>
  7. <value>8192</value> <!-- 单个容器最大内存 -->
  8. </property>
  9. <property>
  10. <name>yarn.nodemanager.vmem-pmem-ratio</name>
  11. <value>2.1</value> <!-- 虚拟内存与物理内存比例 -->
  12. </property>

2.4 Flink配置调整

修改$FLINK_HOME/conf/flink-conf.yaml

  1. yarn.application-name: Flink-Single-Node
  2. yarn.cluster.id: flink-yarn-cluster # 唯一标识
  3. taskmanager.numberOfTaskSlots: 4 # 根据CPU核心数调整
  4. state.backend: rocksdb # 推荐状态后端
  5. state.checkpoints.dir: hdfs://<namenode>:8020/flink/checkpoints

三、启动与验证流程

3.1 启动HDFS与YARN

  1. # 启动HDFS
  2. $HADOOP_HOME/sbin/start-dfs.sh
  3. # 启动YARN
  4. $HADOOP_HOME/sbin/start-yarn.sh
  5. # 验证服务状态
  6. jps | grep -E "NameNode|DataNode|ResourceManager|NodeManager"

3.2 提交Flink到YARN

使用yarn-session模式启动:

  1. flink run-application \
  2. -t yarn-application \
  3. -Dyarn.application.name=Flink-Demo \
  4. -Dtaskmanager.memory.process.size=4096m \
  5. -c org.apache.flink.examples.java.wordcount.WordCount \
  6. $FLINK_HOME/examples/batch/WordCount.jar

或通过client模式启动长期运行集群:

  1. yarn-session.sh \
  2. -n 2 \ # TaskManager数量
  3. -s 4 \ # 每个TM的slot数
  4. -jm 1024 \ # JobManager内存(MB)
  5. -tm 4096 \ # TaskManager内存(MB)
  6. -Dstate.backend=rocksdb

3.3 验证部署状态

  • Web UI检查:访问http://<resourcemanager-ip>:8088/cluster,查看Application状态
  • 日志分析:通过yarn logs -applicationId <app_id>获取详细日志
  • 资源监控:使用yarn node -listyarn top查看节点资源使用情况

四、常见问题与解决方案

4.1 内存不足错误

现象Container is running beyond physical memory limits
解决

  1. 调整yarn.nodemanager.resource.memory-mb参数
  2. 在Flink配置中减少taskmanager.memory.process.size
  3. 检查是否有其他进程占用内存

4.2 HDFS权限问题

现象Permission denied: user=<user>, access=WRITE
解决

  1. 在HDFS中创建专用目录:hdfs dfs -mkdir /flink
  2. 修改目录权限:hdfs dfs -chmod 777 /flink
  3. 或配置Hadoop的core-site.xml添加代理用户:
    1. <property>
    2. <name>hadoop.proxyuser.<username>.hosts</name>
    3. <value>*</value>
    4. </property>

4.3 版本兼容性问题

现象ClassNotFoundException: org.apache.hadoop.yarn.api.records...
解决

  1. 确保Flink编译时包含Hadoop依赖:
    1. mvn clean install -DskipTests \
    2. -Dhadoop.version=3.3.4 \
    3. -Dscala.version=2.12.15
  2. 或使用Flink官方提供的flink-shaded-hadoop

五、性能调优建议

5.1 内存配置优化

  • 堆内存分配:建议JobManager堆内存不超过总内存的30%
  • 托管内存:对于RocksDB后端,分配总内存的50%-60%
  • 网络内存:保持默认值64MB,高并发场景可增至256MB

5.2 并行度设置

  • 根据CPU核心数设置parallelism.default
    1. parallelism.default: 8 # 4核CPU建议值为CPU核心数*2
  • 对于数据倾斜任务,使用rebalance()rescale()算子

5.3 检查点优化

  • 调整检查点间隔:
    1. execution.checkpointing.interval: 1min
  • 启用增量检查点(RocksDB专用):
    1. state.backend.incremental: true

六、进阶使用场景

6.1 动态资源扩展

通过YARN的动态资源分配功能,修改flink-conf.yaml

  1. yarn.application.master.resource.cpu-cores: 2
  2. yarn.application.master.resource.memory-mb: 2048
  3. taskmanager.resource.cpu-cores: 2
  4. taskmanager.resource.memory-mb: 4096

6.2 高可用配置

配置Zookeeper实现HA:

  1. high-availability: zookeeper
  2. high-availability.zookeeper.quorum: <zk1>:2181,<zk2>:2181,<zk3>:2181
  3. high-availability.storageDir: hdfs://<namenode>:8020/flink/ha

6.3 自定义日志配置

修改$FLINK_HOME/conf/log4j-console.properties

  1. rootLogger.level = INFO
  2. rootLogger.appenderRef.file.ref = FileAppender
  3. appender.file.type = File
  4. appender.file.name = FileAppender
  5. appender.file.fileName = ${sys:log.file}
  6. appender.file.layout.type = PatternLayout

七、总结与最佳实践

  1. 资源隔离:建议使用cgroups限制Flink进程资源
  2. 监控集成:通过Prometheus+Grafana监控YARN和Flink指标
  3. 版本管理:保持Hadoop、Flink、Scala版本一致
  4. 备份策略:定期备份HDFS中的检查点数据
  5. 升级路径:小版本升级可直接替换jar包,大版本升级需测试兼容性

通过以上步骤,开发者可在单机环境中构建稳定的Flink on YARN计算平台,为后续扩展至集群部署奠定基础。实际部署时需根据具体业务场景调整参数,并通过压力测试验证系统稳定性。