You need to enable JavaScript to run this app.
导航
LAS Spark
最近更新时间:2025.06.13 16:20:05首次发布时间:2021.12.02 16:58:43
我的收藏
有用
有用
无用
无用

LAS Spark 任务适用于定时执行 Spark 离线任务的场景,支持 Jar 包资源和 Python资源引用的方式。

1 使用限制

LAS ByteLake 表的属性不支持并发写入数据,否则会出现写入冲突情况。通过 Spark 任务在向 ByteLake 表写入数据时,您可在 DataLeap 任务调度设置里,开启最大并发控制按钮,并将其设置为 1,以此避免因实例并发冲突致使任务执行失败。此外,您还需确保在其他业务中不存在并发写入同一个 ByteLake 表的情况。
Image

2 使用前提

3 新建任务

  1. 登录 DataLeap租户控制台,并进入项目的 IDE 开发界面,进行新建任务操作。
  2. 单击新建任务按钮:
    1. 依次选择数据开发 > LAS > LAS Spark 任务类型。
    2. 关联实例:默认选择项目已绑定的 default 实例。绑定详见创建项目
  3. 填写任务基本信息,输入任务名称信息,如:las_spark_task,并选择任务存放的目标文件夹目录。单击确定按钮,完成任务创建。

注意

  1. 在项目控制台管理界面中,如果新增或修改了引擎,那么在数据开发任务新建窗口中,需刷新整个 DataLeap 数据开发界面,才能看到新增或修改后的引擎任务类型。
  2. 任务名称信息仅允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。

Image

4 任务配置说明

新建任务完成后,在任务配置界面完成以下参数配置。

4.1 语言设置

语言类型支持 Java、Python。

说明

语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。

4.2 引入资源

  • 语言类型选择 Java 时,资源类型支持 Jar 资源包的形式,可以按以下方式选择资源:

    • 从资源库选取已有的 jar 资源
    • 新建资源,详见:资源库
  • 语言类型选择 Python 时:

    • 资源类型默认选择 Python 类型,且需选择资源所关联的 LAS Schema 库信息。
    • 在编辑器中输入 Python 语句,执行引擎只支持 Python3.7。示例脚本如下:
    from pyspark import SparkFiles
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext
    
    job_name='pyspark_test_on_las'
    
    spark = SparkSession.builder.appName(job_name).getOrCreate()
    spark.sql("select 1").show(10)
    
    spark.stop()
    

    更多 LAS Spark 任务操作,详见:Spark JAR 开发

注意

若语句中需要设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;

4.3 参数配置

参数

说明

Spark 参数

Main Class/Py-files

  • 语言类型为 Java 时,需填写主类信息,如org.apache.spark.examples.JavaSparkPi。
  • 语言类型为 Python 时,可下拉选择已上传的 LAS 资源,或 新建资源,详见 资源库

Conf参数

配置任务中需设置的一些 conf 参数,如常用的 Driver 内存分配参数spark.driver.memory、CPU 核心数spark.driver.cores等等。更多参数可参考:https://spark.apache.org/docs/latest/configuration.html
您可通过以下两种方式来进行配置:

  • 单行编辑模式:在对应输入框中,输入参数的 key-value值。
  • 脚本编辑模式:支持 JSON、Yaml 的格式,直接用脚本方式进行配置参数。

Image
更多参数配置可参考:https://spark.apache.org/docs/latest/configuration.html

任务参数

自定义参数

输入任务中需要额外传递的参数,如数据输入/输出路径参数、数据处理的业务时间参数、过滤条件参数等等。
多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。支持配置时间参数,格式如 '${DATE}'、'${HOUR}';其他上游任务输入参数或项目参数格式如 {{param1}}。

4.4 任务产出登记

任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 LAS 表的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 LAS 表信息进行推荐。 具体登记内容包括:

  • 数据类型:选择 LAS 时,需下拉选择当前任务写入数据的 LAS 数据库和数据表信息。
  • 其他:该任务不写数据到 LAS 表。

5 使用示例

以下示例将为您演示如何通过 LAS Spark 任务中 Python 语言方式,来直接访问 LAS 表中的数据。

5.1 数据准备

  1. 新建 LAS SQL 作业,操作详见:LAS SQL
  2. 在代码编辑区,编辑并执行以下示例语句,创建 LAS 示例表,并将数据写入表中:
CREATE  TABLE IF NOT EXISTS test_schema.student_demo (
    id INT COMMENT 'id',
    name STRING COMMENT 'name',
    age INT COMMENT 'age'
    )
    PARTITIONED BY (date STRING COMMENT 'date partition')
    stored as bytelake;
    
INSERT INTO test_schema.student_demo PARTITION (date = '20230518') VALUES(1, 'TOM', 10);

5.2 配置 LAS Spark 任务

  1. 新建 LAS Spark 任务,详见上方新建任务
  2. 进入任务配置界面,语言类型选择 Python
  3. 选择库表资源所需关联的 LAS Schema 库信息:test_schema。
    Image
  4. 在代码编辑区域,编辑以下相关 Python 查询语句:

    说明

    LAS ByteLake 2.0 和 Managed Hive 表类型,推荐使用以下 lasfs 示例;对于 ByteLake1.0 内表,需继续使用 Tunnel 方式。详见 Spark Jar 作业开发

    from pyspark import SparkConf
    from pyspark.context import SparkContext
    from pyspark.sql import SparkSession, SQLContext
    
    SparkContext._ensure_initialized()
    spark = SparkSession.builder \
        .appName("pyspark_test_read_hive_x_new_1") \
        .config("spark.sql.lasfs.enabled", "true") \
        .config("spark.hadoop.hive.exec.scratchdir", "lasfs:/warehouse/tmp/hive") \
        .config("spark.hadoop.tmp.dir", "lasfs:/warehouse/tmp/hadoop") \
        .config("spark.hadoop.fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem") \
        .config("spark.hadoop.fs.lasfs.endpoint", "100.96.4.84:80") \
        .config("spark.hadoop.fs.lasfs.service.region", "cn-beijing") \
        .config("spark.hadoop.fs.lasfs.access.key", "AKxxx") \
        .config("spark.hadoop.fs.lasfs.secret.key", "SKxxx==") \
        .config("spark.sql.analysis.hoodie.relation.conversion.skip.enabled", "false") \
        .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") \
        .enableHiveSupport() \
        .getOrCreate()
    sc = spark.sparkContext
    sql = spark.sql
    sqlContext = spark._wrapped
    sqlCtx = sqlContext
    
    df = sql("insert into test_schema.student_demo partition (date='20240619') values('1','TOM','10')")
    df.show()
    
    spark.stop()
    

5.3 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

注意

  • 调试操作,直接使用线上数据进行调试,需谨慎操作。
  • 本任务类型支持调试执行成功或失败后发送消息通知,您可根据业务情况,前往项目控制台 > 配置信息 > 消息通知设置中,选择是否开启任务调试运行成功失败通知。
    • 默认通知方式为邮箱,您需在“账号管理”中,提前绑定相应的安全邮箱信息;
    • 您也可根据业务需要,自行配置飞书应用机器人,通过飞书的方式发送消息通知,飞书消息通知前置操作详见1.1 飞书应用机器人创建

5.4 查看日志

待任务执行成功后,您可进入火山引擎控制台

  1. 搜索访问 LAS 服务,单击进入 LAS 控制台 > 作业管理 > SparkJar 作业界面。
  2. 在作业列表界面,单击右侧操作列中的日志按钮,进入到 SparkJar 作业详细日志界面。
  3. 单击 Driver 日志页签,并搜索数据关键字信息,从日志中查看 LAS 表数据查询结果。
    Image

6 提交任务

调试任务成功,并查看日志校验数据情况无误后,返回数据开发界面,将任务提交发布到运维中心离线任务运维中执行。
单击上方操作栏中的保存提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交
后续任务运维操作详见:离线任务运维