You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

EMR集群用Tez运行Hive查询遇Failed Vertex重跑及Shuffle错误

解决EMR集群Tez引擎下Hive Shuffle阶段Fetch失败过多的问题

问题场景

我在20节点的EMR集群上使用Tez引擎运行Hive查询,执行语句如下:

insert overwrite table temp_table partition(hour) select a,b,c,hour from stats_temp_table;

运行时出现异常:Map 1节点反复重跑,最终Reducer 2节点失败,核心诊断信息为Shuffle阶段因获取失败过多、进度不足导致任务失败,错误栈如下:

Status: Failed Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex re-running, vertexName=Map 1, vertexId=vertex_1523328121104_1005_1_00 
Vertex failed, vertexName=Reducer 2, vertexId=vertex_1523328121104_1005_1_01, diagnostics=[Task failed, taskId=task_1523328121104_1005_1_01_000002, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$ShuffleError: error in shuffle in Fetcher {Map_1} #2 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:301) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:285) 
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
Caused by: java.io.IOException: Map_1: Shuffle failed with too many fetch failures and insufficient progress!failureCounts=5, pendingInputs=12, fetcherHealthy=false, reducerProgressedEnough=false, reducerStalled=true 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.isShuffleHealthy(ShuffleScheduler.java:977) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.copyFailed(ShuffleScheduler.java:718) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.setupConnection(FetcherOrderedGrouped.java:376) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.copyFromHost(FetcherOrderedGrouped.java:260) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.fetchNext(FetcherOrderedGrouped.java:178) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.callInternal(FetcherOrderedGrouped.java:191) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.callInternal(FetcherOrderedGrouped.java:54) 
... 5 more , errorMessage=Shuffle Runner Failed:org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$ShuffleError: error in shuffle in Fetcher {Map_1} #2 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:301) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal(Shuffle.java:285) 
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
Caused by: java.io.IOException: Map_1: Shuffle failed with too many fetch failures and insufficient progress!failureCounts=5, pendingInputs=12, fetcherHealthy=false, reducerProgressedEnough=false, reducerStalled=true 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.isShuffleHealthy(ShuffleScheduler.java:977) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler.copyFailed(ShuffleScheduler.java:718) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.setupConnection(FetcherOrderedGrouped.java:376) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.copyFromHost(FetcherOrderedGrouped.java:260) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.fetchNext(FetcherOrderedGrouped.java:178) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.callInternal(FetcherOrderedGrouped.java:191) 
at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped.callInternal(FetcherOrderedGrouped.java:54) 
... 5 more 

排查与解决步骤

结合EMR集群和Tez引擎的特性,咱们从以下几个方向入手解决:

1. 先排查集群网络连通性与带宽

Shuffle阶段的Fetch失败大概率和节点间网络有关:

  • 登录EMR集群的核心节点,用ping测试所有Worker节点的连通性,看是否有丢包;
  • iperf工具测试节点间的带宽,确认是否存在带宽瓶颈(比如大量Map任务同时输出数据导致网络拥堵);
  • 查看YARN节点管理器(NodeManager)的日志,看是否有网络相关的报错。

2. 调整Tez Shuffle相关参数(最直接的修复手段)

从错误栈看到failureCounts=5触发了失败,咱们可以调大允许的失败次数,并优化Shuffle的超时和并发:

-- 调大允许的Fetch失败次数(默认是5)
set tez.shuffle.max.fetch.failures=10;
-- 增加Shuffle连接超时时间(默认是10000ms,改成30秒)
set tez.shuffle.connect.timeout.ms=30000;
-- 增大Shuffle输入缓冲区大小,减少IO次数
set tez.shuffle.input.buffer.size=204800;
-- 增加Reducer的Fetcher线程数,提升并行拉取能力
set tez.shuffle.fetcher.threads=10;

把这些参数加到Hive查询的开头,或者配置到Hive的hive-site.xml中持久生效。

3. 优化容器资源配置

20节点的EMR集群,需要确保Map/Reducer容器的资源足够,避免因为资源不足导致Map任务反复重跑:

-- 设置每个Tez容器的内存大小(根据你的节点内存调整,比如节点是32G内存的话可以设为16384)
set hive.tez.container.size=8192;
-- 设置JVM堆内存(一般是容器内存的75%左右)
set hive.tez.java.opts=-Xmx6144m;

同时可以检查YARN的资源配置(yarn-site.xml),确保节点的可用内存和CPU没有被其他任务占用过多。

4. 排查数据倾斜问题

如果源表stats_temp_table存在严重的数据倾斜(比如某个hour分区的数据量是其他分区的几十倍),会导致对应的Reducer任务压力过大,进而引发Shuffle超时或Fetch失败:

  • 先查看数据分布:
select hour, count(*) as cnt from stats_temp_table group by hour order by cnt desc;
  • 如果发现倾斜,可以尝试以下方法:
    • 对倾斜的分区单独处理,拆分任务;
    • 使用加盐法(给倾斜的key添加随机后缀,打散到多个Reducer,最后再聚合);
    • 开启Hive的倾斜优化:set hive.optimize.skewjoin=true; set hive.skewjoin.key=100000;

5. 检查动态分区配置

因为是分区表写入,确保动态分区的参数正确:

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

内容的提问来源于stack exchange,提问作者Ravi

火山引擎 最新活动