You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
Apache Hadoop/MiniBase Hadoop
Spark任务
复制全文
Spark任务

Spark任务适用于定时执行Spark离线任务的场景,支持Jar包资源和Python资源引用的方式。

使用前提

项目控制台中,已绑定了 Apache Hadoop/MiniBase Hadoop 引擎实例。

说明

项目控制台中绑定不同的 Apache Hadoop 版本说明如下:

  • Apache Hadoop 集群版本为 3.3.1-mrs 时,即适配您华为云中的 mrs 集群信息。此集群版本下仅支持创建执行 HSQL、Spark Jar 任务类型。
  • Apache Hadoop 集群版本为 3.2.2 时,您可配置 TBDS、EMR 集群信息。该集群信息版本下仅支持创建执行HSQL、Spark Jar、Native Flink Streaming SQL、Native Java Flink 任务类型。

新建任务

  1. 登录 DataLeap 控制台。
  2. 在任务开发界面,单击新建任务 > 数据开发 > Apache Hadoop/MiniBase Hadoop 引擎类型按钮。
  3. 关联实例默认展现项目控制台已绑定的 Apache Hadoop/MiniBase Hadoop 引擎实例。
  4. 选择离线数 Spark 任务。
  5. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
    2. 保存至: 选择任务存放的目标文件夹目录。
      Image
  6. 单击确定按钮,成功创建任务。

任务配置

Spark 任务创建成功后,便可进入任务配置页面。
Image

语言设置

语言类型支持 Java、Python。

注意

语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。

引入资源

选择 Spark 任务引入的资源类型。

  • 语言类型选择 Java 时,资源类型支持 Jar 资源包的形式,可以按以下方式选择资源:
    • Jar 资源包:
      支持下拉从资源库选取已有的 jar 资源。若还未创建 Jar 资源,您可单击新建资源按钮,前往资源库新建,详见资源库
  • 语言类型选择 Python 时:
    • 资源类型默认选择 Python 类型。
    • 在编辑器中输入Python语句,执行引擎默认支持Python2.7。需在代码顶行加上 # -- coding: utf-8 -- 以表示文件编码。

    注意

    若语句中需要设置系统环境变量时,需避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;

参数配置

参数

说明

参数配置---Spark参数

Main Class

Jar 包中定义的主类申明,语言类型为 Java 时需填写。

Files/ Py-files/ Jars/ Archives

选择参数中需引用的其他资源,您可在下拉框中选择或新建资源,新建资源可参见“资源库”。

Conf参数

Spark配置信息,支持单行编辑模式和脚本编辑模式。相关配置信息说明,可参见下方“表 Spark 优化参数说明”。

任务参数

自定义参数

输入任务中需要额外传递的参数,如数据输入/输出路径参数、数据处理的业务时间参数、过滤条件参数等等。
多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。支持配置时间参数,格式如 '${DATE}'、'${HOUR}';其他上游任务输入参数或项目参数格式如 {{param1}}。

任务产出数据登记

任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 Hive 表或者 HDFS 目录的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 Hive 表或者 HDFS 目录进行推荐。
具体登记内容包括:

  • HDFS:该任务会写数据到 HDFS 目录,请填写 HDFS 路径名,路径名可以使用变量,例如 ${date}、${hour} 等。
  • Hive:该任务会写数据到 Hive 表,请填写 Hive 库、表名、分区名,分区内容可以使用变量。
  • 其他:该任务不写数据到 Hive 表或 HDFS 目录。

调度设置

任务配置完成后,在右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置调度属性、依赖、任务输入输出参数等信息。详细参数设置详见:调度设置

调试运行

代码逻辑和参数配置完成后,您可以在界面进行调试操作。
在编辑器上方,依次单击保存、调试按钮,等待调试完成。在下方调试记录左侧,您可通过选择调试状态,筛选出不同调试状态下的历史运行记录。

注意

  • 调试操作,直接使用线上数据进行调试,需谨慎操作。
  • 如果项目管理员在项目控制台>流水线管理中启用了流水线流程校验,则您需要确保提交的任务符合流水线扩展程序的校验规则,才能成功提交。详见配置流水线
  • 在调试记录左侧,您可通过选择调试状态,筛选出不同调试状态下的历史运行记录。
    • 鼠标移动至调试记录上,可查看包括状态、业务日期、提交人等信息。
    • 双击调试记录,您还可编辑调试名称信息,并回车保存。
      Image
  • 单击调试记录,在调试记录右侧,可以查看运行记录的执行日志、概览等详细信息:
    • 概览:查看运行结果的概览情况,包括调试业务日期、执行时长和运行代码等信息。
    • 执行日志:查看任务执行日志详情,可在此处查看 Spark 引擎对应的 JobManager 和 Driver 日志详情信息。并可在日志详情中,按需进行日志的类型选择、日志时间范围选择、日志关键字搜索及日志下载等操作。

      说明

      目前仅 Apache Hadoop 集群下的 Spark、HSQL 任务类型支持查看执行日志。

      Image

提交任务

调试结果无误后,单击提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:提交上线

注意

  • 调试操作,直接使用线上数据进行调试,需谨慎操作。
  • 如果租户项目管理员在项目控制台>流水线管理中启用了流水线流程校验,则您需要确保提交的任务符合流水线扩展程序的校验规则,才能成功提交。详见4 流水线管理

后续任务运维操作详见:离线任务运维

Spark使用说明

  1. Spark insert overwrite 使用示例

    df.write.insertInto("aaa_bbb_doc_html_detail", True)
    
  2. 用户自定义参数较多时,且存在K-V参数时,可参考如下配置:
    Spark程序中,如果自定义参数较多,可直接在自定义参数输入框中配置,使用空格分隔。数据开发运行时,会将自定义参数拼接到Main Class之后,配置输入参数格式需符合spark main class程序解析方式,如下图所示。
    Image
    如需使用时间变量,也可以直接在自定义参数中配置${date}/${DATE}等系统变量。

  3. Spark优化参数
    表 Spark 优化参数说明

    参数

    说明

    默认值

    spark.executor.instances

    静态资源下:executor数

    2

    spark.executor.cores

    每个executor和CPU数

    4

    spark.dynamicAllocation.enabled

    动态资源开关

    false

    spark.dynamicAllocation.maxExecutors

    动态资源下:executor的最大个数

    500

    spark.executor.memory

    每个executor的内存大小

    16g

    spark.memory.fraction

    executor用于计算的内存比例,剩余部分用于存储元数据和运行信息。对于executor内存开的较大的任务,可以适当提高这个值,让更多内存参与计算,但会增加OOM风险

    0.6

    spark.executor.memoryOverhead/ spark.yarn.executor.memoryOverhead

    每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如: set spark.executor.memoryOverhead=3072 set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

    6144

    spark.sql.adaptive.enabled

    Adaptive execution开关,包含自动调整并行度,解决数据倾斜等优化

    true

    spark.sql.adaptive.minNumPostShufflePartitions

    AE相关,动态最小的并行度

    1

    spark.sql.adaptive.maxNumPostShufflePartitions

    AE相关,动态最大的并行度,对于shuffle量大的任务适当增大可以减少每个task的数据量,如1024

    1000

    spark.sql.adaptive.join.enabled

    AE相关,开启后能够根据数据量自动判断能否将sortMergeJoin转换成broadcast join

    true

    spark.sql.adaptiveBroadcastJoinThreshold

    AE相关,spark.sql.adaptive.join.enabled设置为true后会判断join的数据量是否小于该参数值,如果小于则能将sortMergeJoin转换成broadcast join

    spark.sql.autoBroadcastJoinThreshold

    spark.sql.adaptive.skewedJoin.enabled

    AE相关,开启后能够自动处理join时的数据倾斜,对于数据量明显高于中位数的task拆分成多个小task

    false

    spark.sql.adaptive.skewedPartitionFactor

    AE相关,数据倾斜判定标准,当同一stage的某个task数据量超过中位数的N倍,将会判定为数据倾斜

    5

    spark.sql.adaptive.skewedPartitionMaxSplits

    AE相关,被判定为数据倾斜后最多会被拆分成的份数

    5

    spark.shuffle.accurateBlockThreshold

    AE相关,数据倾斜判定基于shuffle数据量统计,如果统计所有的block数据,消耗内存较大,因此设有阈值,当shuffle的单个数据块超过大小和行数阈值时,才会进入统计,这个参数即大小阈值

    10010241024(100MB)

    spark.shuffle.accurateBlockRecordThreshold

    AE相关,同上,行数阈值,如果设置了上面的数据倾斜处理开关,仍然倾斜,可能是因为这几个参数设得偏大,适当缩小

    2 * 1024 * 1024

    spark.sql.files.maxPartitionBytes

    默认一个task处理的数据大小,如果给的太小会造成最终任务task太多,太大会是输入环节计算较慢

    1073741824

    spark.vcore.boost.ratio

    vcore,虚拟核数,设置大于1的数可以使一个核分配多个task,对于简单sql可以提升CPU利用率,对于复杂任务有OOM风险

    1

    spark.shuffle.hdfs.enabled(长任务推荐)

    HDFS based Spark Shuffle开关,可以提高任务容错性。遇到org.apache.spark.shuffle.FetchFailedException报错需设置

    false

    set spark.shuffle.io.maxRetries=1; set spark.shuffle.io.retryWait=0s;

    一般在开启hdfs shuffle后还可以开启这两个参数,避免不必要的重试和等待

    spark.sql.crossJoin.enabled

    对于会产生笛卡尔积的sql,默认配置是限制不能跑的,在hive里可以配置set hive.mapred.mode=nonstrict跳过限制,相对应的在spark里可以配置set spark.sql.crossJoin.enabled=true起到同样的效果。

    false

    spark.sql.broadcastTimeout

    broadcast joins时,广播数据最长等待时间,网络不稳定时,容易出现超时造成任务失败,可适当增大此参数。

    300(单位:s)

    spark.sql.autoBroadcastJoinThreshold

    表能够使用broadcast join的最大阈值

    10MB

    spark.network.timeout

    网络连接超时参数

    120s

    spark.maxRemoteBlockSizeFetchToMem

    reduce端获取的remote block存放到内存的阈值,超过该阈值后数据会写磁盘,当出现数据量比较大的block时,建议调小该参数(比如512MB)。

    Long.MaxValue

    spark.reducer.maxSizeInFlight

    控制从一个worker拉数据缓存的最大值

    48m

    spark.merge.files.enabled

    合并输出文件,如果insert结果的输出文件数很多,希望合并,可以设为true,会多增加一个repartition stage合并文件,repartition的分区数由spark.merge.files.number控制

    false

    spark.merge.files.number

    控制合并输出文件的输出数量

    512

    spark.speculation

    推测执行开关。如果是原生任务很有可能没开这个参数,会出现个别task拖慢整个任务,可以开启这个参数。

    true

    spark.speculation.multiplier

    开启推测执行的时间倍数阈值:当某个任务运行时间/中位数时间大于该值,触发推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。

    1.5

    spark.speculation.quantile

    同一个stage中的task超过这个参数比例的task完成后,才会开启推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。

    0.75

    spark.default.parallelism

    Spark Core默认并发度,原生spark程序并发度设置

    200

    spark.sql.shuffle.partitions

    Spark SQL默认并发度,AE开启后被spark.sql.adaptive.maxNumPostShufflePartitions取代

    200

    spark.sql.sources.bucketing.enabled

    分桶表相关,当设置为false,会将分桶表当作普通表来处理。做为普通表会忽略分桶特性,部分情况性能会下降。但如果分桶表没有被正确生成(即表定义是分桶表,但数据未按分桶表生成)会报错RuntimeException: Invalid bucket file,避免这个错误,要将这个参数设为false

    true

    spark.sql.partition.rownum.collect.enable

    统计生成固定分区表行数

    false

    spark.sql.dynamic.partition.rownum.collect.enable

    统计生成动态分区表行数

    false

最近更新时间:2025.07.02 17:59:03
这个页面对您有帮助吗?
有用
有用
无用
无用