调优方向 | 调优方案 |
---|---|
代码优化 | 代码优化 |
参数调优 | 内存参数CPU 参数开启向量化Task 数量优化合并小文件 |
SELECT g,COUNT(DISTINCT CASE WHEN a 1 THEN user_id) cnt_user1,COUNT(DISTINCT CASE WHEN a 2 THEN user_id) cnt_user2,COUNT(DISTINCT CASE WHEN a 3 THEN user_id) cnt_user3,COUNT(DISTINCT CASE WHEN a 4 THEN user_id) cnt_user4
FROM tbl
GROUP BY g
sql
SELECT g,SUM(CASE WHEN user1 0 THEN 1 END) as cnt_user1,SUM(CASE WHEN user2 0 THEN 1 END) as cnt_user2,SUM(CASE WHEN user3 0 THEN 1 END) as cnt_user3,SUM(CASE WHEN user4 0 THEN 1 END) as cnt_user4
FROM (SELECT g,user_id,COUNT(CASE WHEN a 1 THEN user_id) user1,COUNT(CASE WHEN a 2 THEN user_id) user2,COUNT(CASE WHEN a 3 THEN user_id) user3,COUNT(CASE WHEN a 4 THEN user_id) user4FROM tblGROUP BY g,user_id ) tmp
GROUP BY g
sql
数据倾斜一般出现在 group by 或 大表 join 时,某些 key 的数据量特别大,导致某些算子的计算量大大超过了其他算子。
SET hive.map.aggr=true;
--(用于设定Map端进行聚合操作的条目数,视场景而定)
SET hive.groupby.mapaggr.checkinterval=200000;
SET hive.groupby.skewindata=true;
例如,event 表存在大量 user_id 为 null 的记录,但是表 users 中不会存在 user_id 为空的记录,可以把 null 随机化再 join,这样就避免了 null 值都分发到一个 Reduce Task 上。
SELECT * FROM event a LEFT OUTER
JOIN users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id=b.user_id END;
可以在 LAS 集群的 MapReduce2 服务下,通过设置以下参数,对 Map 和 Reduce 阶段的内存进行调优:
参数 | 描述 | 示例 |
---|---|---|
mapreduce.map.java.opts | 默认参数,表示JVM堆内存。 | -Xmx2048m |
mapreduce.map.memory.mb | 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。 | 2304 |
参数 | 描述 | 示例 |
---|---|---|
mapreduce.reduce.java.opts | 默认参数,表示JVM堆内存。 | -Xmx2048m |
mapreduce.reduce.memory.mb | 默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。 | 2304 |
可以通过设置以下参数,对Map和Reduce任务的CPU进行调优。
参数 | 描述 |
---|---|
mapreduce.map.cpu.vcores | 每个Map任务可用的最多的CPU Core数目。 |
mapreduce.reduce.cpu.vcores | 每个Reduce任务可用的最多的CPU Core数目。说明 此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。 |
参数 | 描述 |
---|---|
hive.vectorized.execution.enabled | 默认值为true。开启向量化查询的开关。 |
hive.vectorized.execution.reduce.enabled | 默认值为true。表示是否启用Reduce任务的向量化执行模式。 |
Map Task 数量优化
在分布式计算中,原始数据是决定 Map 数量的一个重要因素。优化的策略有:
决定 Map Task 数量的参数有:
default_mapper_num = total_size/dfs.block.size
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size)) split_num = total_size/default_split_size;
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
Reduce Task 数量优化
reducer_num = min(
total_size/hive.exec.reducers.bytes.per.reducers,
hive.exec.reducers.max)。
sql
合并小文件
大量小文件容易在文件存储端造成瓶颈,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来处理。
参数 | 描述 |
---|---|
hive.merge.mapfiles | 默认值为true。表示是否合并Map输出文件。 |
hive.merge.mapredfiles | 默认值为false。表示是否合并Reduce输出文件。 |
hive.merge.size.per.task | 默认值为256000000,单位字节。表示合并文件的大小。 |