You need to enable JavaScript to run this app.
DataLeap私有化V2.1.2

DataLeap私有化V2.1.2

复制全文
3.1 批开发
3.1.4 Spark任务
复制全文
3.1.4 Spark任务

使用场景

spark任务,具体分为Jar spark任务和python spark任务。

Spark 使用说明

  1. Spark insert overwrite 使用示例
    df.write.insertInto("aaa\\\_bbb\\\_doc\\\_html\\\_detail", True)
  2. 用户自定义参数较多时,且存在K-V参数时,可参考如下配置
    Spark程序中,如果自定义参数较多, 可直接在"自定义参数"输入框中配置,使用空格分隔,数据开发运行时,会将自定义参数拼接到main class之后,配置输入参数格式需符合spark main class程序解析方式。
    如下图
    alt
    如需使用时间变量,也可以直接在自定义参数中配置${date}/${DATE}等系统变量。
  3. Spark优化参数
    参数列表
参数描述默认值
spark.executor.instances静态资源下:executor数2
spark.executor.cores每个executor和CPU数4
spark.dynamicAllocation.enabled动态资源开关false
spark.dynamicAllocation.maxExecutors动态资源下:executor的最大个数500
spark.executor.memory每个executor的内存大小8g
spark.memory.fractionexecutor用于计算的内存比例,剩余部分用于存储元数据和运行信息。对于executor内存开的较大的任务,可以适当提高这个值,让更多内存参与计算,但会增加OOM风险0.6

spark.executor.memoryOverhead/
spark.yarn.executor.memoryOverhead

每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如:
set spark.executor.memoryOverhead=3072
set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

6144

spark.sql.adaptive.enabledAdaptive execution开关,包含自动调整并行度,解决数据倾斜等优化true
spark.sql.adaptive.minNumPostShufflePartitionsAE相关,动态最小的并行度1
spark.sql.adaptive.maxNumPostShufflePartitionsAE相关,动态最大的并行度,对于shuffle量大的任务适当增大可以减少每个task的数据量,如10241000
spark.sql.adaptive.join.enabledAE相关,开启后能够根据数据量自动判断能否将sortMergeJoin转换成broadcast jointrue
spark.sql.adaptiveBroadcastJoinThresholdAE相关,spark.sql.adaptive.join.enabled设置为true后会判断join的数据量是否小于该参数值,如果小于则能将sortMergeJoin转换成broadcast joinspark.sql.autoBroadcastJoinThreshold
spark.sql.adaptive.skewedJoin.enabledAE相关,开启后能够自动处理join时的数据倾斜,对于数据量明显高于中位数的task拆分成多个小taskfalse
spark.sql.adaptive.skewedPartitionFactorAE相关,数据倾斜判定标准,当同一stage的某个task数据量超过中位数的N倍,将会判定为数据倾斜5
spark.sql.adaptive.skewedPartitionMaxSplitsAE相关,被判定为数据倾斜后最多会被拆分成的份数5
spark.shuffle.accurateBlockThresholdAE相关,数据倾斜判定基于shuffle数据量统计,如果统计所有的block数据,消耗内存较大,因此设有阈值,当shuffle的单个数据块超过大小和行数阈值时,才会进入统计,这个参数即大小阈值100*1024*1024(100MB)
spark.shuffle.accurateBlockRecordThresholdAE相关,同上,行数阈值,如果设置了上面的数据倾斜处理开关,仍然倾斜,可能是因为这几个参数设得偏大,适当缩小2 * 1024 * 1024
spark.sql.files.maxPartitionBytes默认一个task处理的数据大小,如果给的太小会造成最终任务task太多,太大会是输入环节计算较慢1073741824
spark.vcore.boost.ratiovcore,虚拟核数,设置大于1的数可以使一个核分配多个task,对于简单sql可以提升CPU利用率,对于复杂任务有OOM风险1
spark.shuffle.hdfs.enabled(长任务推荐)HDFS based Spark Shuffle开关,可以提高任务容错性。遇到org.apache.spark.shuffle.FetchFailedException报错需设置false

set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;

一般在开启hdfs shuffle后还可以开启这两个参数,避免不必要的重试和等待

spark.sql.crossJoin.enabled对于会产生笛卡尔积的sql,默认配置是限制不能跑的,在hive里可以配置set hive.mapred.mode=nonstrict跳过限制,相对应的在spark里可以配置set spark.sql.crossJoin.enabled=true起到同样的效果。false
spark.sql.broadcastTimeoutbroadcast joins时,广播数据最长等待时间,网络不稳定时,容易出现超时造成任务失败,可适当增大此参数。300(单位:s)
spark.sql.autoBroadcastJoinThreshold表能够使用broadcast join的最大阈值10MB
spark.network.timeout网络连接超时参数120s
spark.maxRemoteBlockSizeFetchToMemreduce端获取的remote block存放到内存的阈值,超过该阈值后数据会写磁盘,当出现数据量比较大的block时,建议调小该参数(比如512MB)。Long.MaxValue
spark.reducer.maxSizeInFlight控制从一个worker拉数据缓存的最大值48m
spark.merge.files.enabled合并输出文件,如果insert结果的输出文件数很多,希望合并,可以设为true,会多增加一个repartition stage合并文件,repartition的分区数由spark.merge.files.number控制false
spark.merge.files.number控制合并输出文件的输出数量512
spark.speculation推测执行开关。如果是原生任务很有可能没开这个参数,会出现个别task拖慢整个任务,可以开启这个参数。true
spark.speculation.multiplier开启推测执行的时间倍数阈值:当某个任务运行时间/中位数时间大于该值,触发推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。1.5
spark.speculation.quantile同一个stage中的task超过这个参数比例的task完成后,才会开启推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。0.75
spark.default.parallelismSpark Core默认并发度,原生spark程序并发度设置200
spark.sql.shuffle.partitionsSpark SQL默认并发度,AE开启后被spark.sql.adaptive.maxNumPostShufflePartitions取代200
spark.sql.sources.bucketing.enabled分桶表相关,当设置为false,会将分桶表当作普通表来处理。做为普通表会忽略分桶特性,部分情况性能会下降。但如果分桶表没有被正确生成(即表定义是分桶表,但数据未按分桶表生成)会报错RuntimeException: Invalid bucket file,避免这个错误,要将这个参数设为falsetrue
spark.sql.partition.rownum.collect.enable统计生成固定分区表行数false
spark.sql.dynamic.partition.rownum.collect.enable统计生成动态分区表行数false
最近更新时间:2022.09.05 11:25:29
这个页面对您有帮助吗?
有用
有用
无用
无用