Flink on Yarn读取Hive大量小文件致集群资源耗尽的参数调优咨询
解决Flink on Yarn读取Hive小文件导致资源耗尽的参数调整方案
遇到过好多次这种Hive小文件引发的Flink资源爆仓问题,给你几个实用的调整方向,亲测能有效控制资源占用:
1. 直接限制作业并行度
这是最直接的手段,避免Flink因小文件过多自动拉满并行度:
- 设置Table API的默认并行度:
这个参数会覆盖Flink全局的table.exec.resource.default-parallelism: 100execution.parallelism,专门针对Table API作业生效,你可以根据集群剩余资源设置合理值(比如集群总slot的30%-50%)。 - 限制作业的最大并行度:
就算有极端情况,并行度也不会超过这个阈值,防止意外占满资源。execution.max-parallelism: 200
2. 开启小文件合并,减少Split数量
从读取源头合并小文件,降低需要处理的Split数,自然减少并行度需求:
- 启用Hive的合并输入格式:
在Flink配置或作业参数中添加:
这个Hive原生的输入格式会自动将小文件合并成大的Split,默认按HDFS块大小(比如128MB)合并。hive.input.format: org.apache.hadoop.hive.ql.io.CombineHiveInputFormat - 调整合并Split的大小阈值:
配合上面的输入格式,设置合并的大小参数:
把多个小文件合并成128MB的Split,大幅减少Split数量。mapred.max.split.size: 134217728 # 128MB mapred.min.split.size.per.node: 134217728 - 让Flink fallback到MapReduce Reader处理小文件:
这个参数会让Flink使用MapReduce的读取逻辑,天生支持小文件合并,适配Hive小文件场景。table.exec.hive.fallback-mapred-reader: true
3. 限制作业的Yarn资源规模
通过Yarn提交参数直接限定作业能使用的资源总量,从底层卡死资源上限:
提交作业时添加以下参数(根据集群情况调整):
-yjm 2g # JobManager内存 -ytm 8g # 每个TaskManager内存 -yn 10 # TaskManager数量 -ys 10 # 每个TaskManager的slot数
这样作业最多使用10*10=100个slot,不管小文件有多少,都不会超过这个资源量。
4. 优化TaskManager的资源配置
调整每个TaskManager的slot数和内存,更高效利用资源:
taskmanager.numberOfTaskSlots: 10 taskmanager.memory.task.heap.size: 4g
比如每个TaskManager开10个slot,每个slot分配4G堆内存,既减少TaskManager的数量,又能控制单节点的资源占用。
额外建议:从源头上解决小文件问题
如果是长期运行的作业,建议定期合并Hive表的小文件:
ALTER TABLE your_hive_table CONCATENATE;
这个Hive命令会自动合并表中的小文件,从根源上避免后续作业遇到同样的问题。
内容的提问来源于stack exchange,提问作者zhouyb




