You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
湖仓分析一体服务LAS引擎
LAS Spark任务
复制全文
LAS Spark任务

LAS Spark任务适用于定时执行Spark离线任务的场景,为满足您定制化的数据查询分析,DataLeap支持Jar包资源和Python资源引用的方式,来进行定制化的数据分析工作。

新建任务

任务新建步骤如下:

  1. 登录DataLeap控制台。
  2. 选择数据开发 > 任务开发,进入任务开发页面。
  3. 单击新建任务 > 数据开发 > LAS引擎 > LAS Spark任务按钮,新建LAS Spark任务。
  4. 填写任务基本信息:
    1. 任务名称:输入任务的名称,只允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。
    2. 保存至:选择任务存放的目标文件夹目录。
      Image
  5. 单击确定按钮,完成任务创建。

任务配置

任务新建完成后,在任务配置界面完成以下参数配置。
Image

语言设置

语言类型支持 Java、Python。

注意

语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。

引入资源

  • 语言类型选择 Java 时,资源类型支持 Jar 资源包的形式,可以按以下方式选择资源:
    • 从资源库选取已有的 jar 资源
    • 新建资源,详见:资源库
  • 语言类型选择 Python 时:
    • 资源类型默认选择 Python 类型,且需选择资源所关联的 LAS Schema 库信息。

    • 在编辑器中输入 Python 语句,执行引擎只支持 Python3.7。若需使用 python2.7 时,请在代码顶行加上 # -- coding: utf-8 --以表示文件编码。
      示例脚本如下:

      from pyspark import SparkConf
      from pyspark.context import SparkContext
      from pyspark.sql import SparkSession, SQLContext
      
      SparkContext._ensure_initialized()
      spark = SparkSession.builder \
          .appName("pyspark_test_read_hive") \
          .config("spark.hive.client.minibase.username", "axxxxxn") \
          .config("spark.hive.client.minibase.password", "xxxxxxxxx") \
          .enableHiveSupport() \
          .getOrCreate()
      sc = spark.sparkContext
      sql = spark.sql
      sqlContext = spark._wrapped
      sqlCtx = sqlContext
      
      df = sql("INSERT INTO TABLE db.las_table_name PARTITION (date = '20221213') VALUES ('test1', 'test2')")
      df.show()
      
      spark.stop()
      

      注意

      设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;

参数配置

参数

说明

Spark 参数

Main Class

语言类型为 Java 时,需填写Main Class主类信息,如org.apache.spark.examples.JavaSparkPi。

Py-files

语言类型为 Python 时,可添加Py-files资源文件,您可下拉选择已上传的 LAS 资源,或 新建资源,详见资源库

Conf参数

Spark配置信息,支持单行编辑模式和脚本编辑模式。相关配置信息说明,可参见下方“Spark优化参数说明表”。

任务参数

自定义参数

输入任务中已定义的参数,多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。

任务产出登记

任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 LAS 表的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 LAS 表信息进行推荐。 具体登记内容包括:

  • 数据类型:选择 LAS 时,需下拉选择当前任务写入数据的 LAS 数据库和数据表信息。
  • 其他:该任务不写数据到 LAS 表。

调度设置

任务配置完成后,您可单击右侧侧边栏的调度设置,配置调度设置参数。调度设置说明详见“调度设置”。

Spark使用说明

  1. Spark insert overwrite 使用示例
    df.write.insertInto("aaa_bbb_doc_html_detail", True)

  2. 用户自定义参数较多时,且存在K-V参数时,可参考如下配置:
    Spark程序中,如果自定义参数较多,可直接在自定义参数输入框中配置,使用空格分隔。数据开发运行时,会将自定义参数拼接到Main Class之后,配置输入参数格式需符合spark main class程序解析方式,如下图所示。
    Image
    如需使用时间变量,也可以直接在自定义参数中配置${date}/${DATE}等系统变量。

  3. Spark优化参数
    Spark优化参数说明表

    参数

    说明

    默认值

    spark.executor.instances

    静态资源下:executor数

    2

    spark.executor.cores

    每个executor和CPU数

    4

    spark.dynamicAllocation.enabled

    动态资源开关

    false

    spark.dynamicAllocation.maxExecutors

    动态资源下:executor的最大个数

    500

    spark.executor.memory

    每个executor的内存大小

    8g

    spark.memory.fraction

    executor用于计算的内存比例,剩余部分用于存储元数据和运行信息。对于executor内存开的较大的任务,可以适当提高这个值,让更多内存参与计算,但会增加OOM风险

    0.6

    spark.executor.memoryOverhead/ spark.yarn.executor.memoryOverhead

    每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如: set spark.executor.memoryOverhead=3072 set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

    6144

    spark.sql.adaptive.enabled

    Adaptive execution开关,包含自动调整并行度,解决数据倾斜等优化

    true

    spark.sql.adaptive.minNumPostShufflePartitions

    AE相关,动态最小的并行度

    1

    spark.sql.adaptive.maxNumPostShufflePartitions

    AE相关,动态最大的并行度,对于shuffle量大的任务适当增大可以减少每个task的数据量,如1024

    1000

    spark.sql.adaptive.join.enabled

    AE相关,开启后能够根据数据量自动判断能否将sortMergeJoin转换成broadcast join

    true

    spark.sql.adaptiveBroadcastJoinThreshold

    AE相关,spark.sql.adaptive.join.enabled设置为true后会判断join的数据量是否小于该参数值,如果小于则能将sortMergeJoin转换成broadcast join

    spark.sql.autoBroadcastJoinThreshold

    spark.sql.adaptive.skewedJoin.enabled

    AE相关,开启后能够自动处理join时的数据倾斜,对于数据量明显高于中位数的task拆分成多个小task

    false

    spark.sql.adaptive.skewedPartitionFactor

    AE相关,数据倾斜判定标准,当同一stage的某个task数据量超过中位数的N倍,将会判定为数据倾斜

    5

    spark.sql.adaptive.skewedPartitionMaxSplits

    AE相关,被判定为数据倾斜后最多会被拆分成的份数

    5

    spark.shuffle.accurateBlockThreshold

    AE相关,数据倾斜判定基于shuffle数据量统计,如果统计所有的block数据,消耗内存较大,因此设有阈值,当shuffle的单个数据块超过大小和行数阈值时,才会进入统计,这个参数即大小阈值

    10010241024(100MB)

    spark.shuffle.accurateBlockRecordThreshold

    AE相关,同上,行数阈值,如果设置了上面的数据倾斜处理开关,仍然倾斜,可能是因为这几个参数设得偏大,适当缩小

    2 * 1024 * 1024

    spark.sql.files.maxPartitionBytes

    默认一个task处理的数据大小,如果给的太小会造成最终任务task太多,太大会是输入环节计算较慢

    1073741824

    spark.vcore.boost.ratio

    vcore,虚拟核数,设置大于1的数可以使一个核分配多个task,对于简单sql可以提升CPU利用率,对于复杂任务有OOM风险

    1

    spark.shuffle.hdfs.enabled(长任务推荐)

    HDFS based Spark Shuffle开关,可以提高任务容错性。遇到org.apache.spark.shuffle.FetchFailedException报错需设置

    false

    set spark.shuffle.io.maxRetries=1; set spark.shuffle.io.retryWait=0s;

    一般在开启hdfs shuffle后还可以开启这两个参数,避免不必要的重试和等待

    spark.sql.crossJoin.enabled

    对于会产生笛卡尔积的sql,默认配置是限制不能跑的,在hive里可以配置set hive.mapred.mode=nonstrict跳过限制,相对应的在spark里可以配置set spark.sql.crossJoin.enabled=true起到同样的效果。

    false

    spark.sql.broadcastTimeout

    broadcast joins时,广播数据最长等待时间,网络不稳定时,容易出现超时造成任务失败,可适当增大此参数。

    300(单位:s)

    spark.sql.autoBroadcastJoinThreshold

    表能够使用broadcast join的最大阈值

    10MB

    spark.network.timeout

    网络连接超时参数

    120s

    spark.maxRemoteBlockSizeFetchToMem

    reduce端获取的remote block存放到内存的阈值,超过该阈值后数据会写磁盘,当出现数据量比较大的block时,建议调小该参数(比如512MB)。

    Long.MaxValue

    spark.reducer.maxSizeInFlight

    控制从一个worker拉数据缓存的最大值

    48m

    spark.merge.files.enabled

    合并输出文件,如果insert结果的输出文件数很多,希望合并,可以设为true,会多增加一个repartition stage合并文件,repartition的分区数由spark.merge.files.number控制

    false

    spark.merge.files.number

    控制合并输出文件的输出数量

    512

    spark.speculation

    推测执行开关。如果是原生任务很有可能没开这个参数,会出现个别task拖慢整个任务,可以开启这个参数。

    true

    spark.speculation.multiplier

    开启推测执行的时间倍数阈值:当某个任务运行时间/中位数时间大于该值,触发推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。

    1.5

    spark.speculation.quantile

    同一个stage中的task超过这个参数比例的task完成后,才会开启推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。

    0.75

    spark.default.parallelism

    Spark Core默认并发度,原生spark程序并发度设置

    200

    spark.sql.shuffle.partitions

    Spark SQL默认并发度,AE开启后被spark.sql.adaptive.maxNumPostShufflePartitions取代

    200

    spark.sql.sources.bucketing.enabled

    分桶表相关,当设置为false,会将分桶表当作普通表来处理。做为普通表会忽略分桶特性,部分情况性能会下降。但如果分桶表没有被正确生成(即表定义是分桶表,但数据未按分桶表生成)会报错RuntimeException: Invalid bucket file,避免这个错误,要将这个参数设为false

    true

    spark.sql.partition.rownum.collect.enable

    统计生成固定分区表行数

    false

    spark.sql.dynamic.partition.rownum.collect.enable

    统计生成动态分区表行数

    false

使用示例

以下将为您演示如何通过LAS Spark任务,使用JAR和Python语言方式,来访问LAS表中的数据。

LAS用户信息获取

您可通过LAS管理员账户或添加的托管账号信息来访问LAS数据。

  • 方案一:使用LAS管理员账户密码
    您可使用环境部署时的LAS Admin用户信息,来直接访问LAS数据,用此账户可以避免访问时出现权限问题,但因该账号权限较大,存在信息泄露风险,不建议使用Admin信息来访问。
  • 方案二:新建任务托管账号,并手动赋予其需要访问的库表权限。
    1. 登录LAS控制台页面,在管理中心 > 用户管理中,添加用户信息,并获取其密码信息。您可将该用户名密码信息仅用于SparkJar任务的参数使用。
    2. 在LAS控制台页面,上方权限管理页签中,手动对新的账户,进行队列管理授权和数据库表权限授权,分别为账号授予 Developer 权限。

Java语言类型

Jar依赖说明

需要在 pom 里面加入以下依赖:

说明

使用provided的依赖,同时不要带入hive相关的依赖,避免与已有的数据中台冲突。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>
 
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.12</artifactId>
    <version>3.0.1</version>
    <scope>provided</scope>
</dependency>

您也可以根据实际情况添加需要使用的自定义connector。

代码示例

import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
public class SparkJarTest {
 
        SparkSession spark = SparkSession.builder()
                .appName("las-spark-test:spark_sdk_test")
                .enableHiveSupport()
                .config("spark.hive.client.minibase.username", ${username})
                .config("spark.hive.client.minibase.password", ${password})
                .getOrCreate();
 
 
        String sql = "insert into ${schema.table} PARTITION(dt=2023) select * from ${schema.table} where dt=2022";
 
        spark.sql(sql);
        spark.close();
}

参数说明

在初始化SparkSession时,通过config传入相关配置:

参数

说明

spark.hive.client.minibase.username

替换为LAS控制台中创建的username信息

spark.hive.client.minibase.password

替换为LAS控制台账号对应的password信息

配置LAS Spark作业

  1. 在资源库界面,创建LAS引擎下的JAR资源,将本地创建好的JAR包,上传至DataLeap中。操作详见“资源库”。
  2. 在数据开发界面,新建LAS Spark作业,语言类型选择Java,并将上传成功的JAR包,其对应的资源,引入到Spark任务中。
    Image
  3. 资源引入完成后,进行后续的Spark参数配置,填写JAR包的Main Class、Conf参数等信息。
  4. Spark 作业配置完成,您便可对其进行后续的调试、提交发布、运维等操作。

Python语言类型

您可参考下方示例代码,通过Python语言方式,来访问LAS数据表信息。

  1. 新建LAS Spark任务,语言类型选择Python方式。创建任务操作详“新建任务”。

  2. 输入以下示例代码:

    from pyspark.sql import SparkSession
    from pyspark.context import SparkFiles
    from pyspark.sql import SparkSession,SQLContext
    SparkContext._ensure_initialized()
    spark=SparkSession
    .builder.appName("las_test")
    .config("spark.hive.client.minibase.username","las控制台使用的username")
    .config("spark.hive.client.minibase.password","las控制台使用的password")
    .enableHiveSupport()
    .getOrCreate()
    sc = spark.sparkContext
    sql = spark.sql
    sqlContext = spark._wrapped
    sqlCtx = sqlContext
    #下面sql语句请替换成需要执行的sql语句
    df = sql("select * from database.table_name where date='xxxx';")
    df.show()
    spark.stop()
    

    参数说明:

    参数

    说明

    spark.hive.client.minibase.username

    替换为LAS控制台中创建的username信息

    spark.hive.client.minibase.password

    替换为LAS控制台账号对应的password信息

最近更新时间:2025.03.19 19:31:07
这个页面对您有帮助吗?
有用
有用
无用
无用