You need to enable JavaScript to run this app.
导航
EMR Serverless Spark Java/Python
最近更新时间:2025.06.09 10:16:09首次发布时间:2024.05.24 15:27:43
我的收藏
有用
有用
无用
无用

DataLeap 支持与火山引擎 E - MapReduce(EMR)中的 Serverless Spark 实例对接。您可在本地编写 Java/Python 代码,将其上传至对象存储系统 TOS,随后在 DataLeap 里以可视化方式配置 Spark Java/Python 作业,引用资源并执行任务,从而满足业务场景中的数据开发需求。本文将为您介绍 DataLeap 中 EMR Serverless Spark Java/Python 任务的配置操作。

1 功能背景

EMR Serverless Spark 是 EMR 框架下 Serverless 形态的数据产品,具备开箱即用且完全兼容开源 Spark 引擎的能力。它内置高可用的 Remote Shuffle Service(RSS),该服务允许 Shuffle Service在 Spark 之外运行,能够直接读写对象存储系统 TOS,实现存储与计算的解耦,从而提供更好的可用性和性能。
在业务场景涉及大规模机器学习和深度学习训练时,可借助 DataLeap 执行 Serverless Spark Java/Python 任务,来处理海量数据并进行复杂的计算。利用 EMR Serverless Spark 的高可用以及存算分离架构能力,能够加速模型训练进程,有效提高业务决策的准确性与效率。
Image

2 使用限制

  • 需开通 DataLeap 服务版本中大数据分析、DataOps敏捷研发分布式数据自治的服务,项目方可继续绑定 EMR Serverless Spark 实例。详见版本服务说明
  • EMR Serverless Spark Java/Python 任务中的语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
  • 本任务类型暂不支持调试执行成功或失败后发送消息通知。

3 使用前提

  1. 已开通 EMR Serverless Spark 队列资源实例。若在 DataLeap 中使用子用户访问 EMR Serverless Spark 队列资源时,需确保拥有 EMRServerlessReadOnlyAccess 或更高权限策略。子用户可请主账号在访问控制界面进行权限策略添加,或者也可通过自定义策略方式进行配置相应权限。详见自定义策略
  2. 对象存储 TOS 服务用于存放 LAS Catalog 库表数据与 JAR 资源包存放,因此需提前开通 TOS 服务,并创建存储桶。若子用户通过 EMR Serverless Spark Java/Python 任务访问 TOS 资源时,需确保子用户有 TOS 相关访问权限。权限配置操作详见 TOS 权限配置
  3. 如果创建 EMR Serverless Spark Java 任务,将直接使用已上传到 TOS 的资源包,需要获取资源包路径信息。资源包上传操作详见 上传文件
  4. 已在 DataLeap 项目控制台中,绑定相应的 EMR Serverless Spark 服务实例。详见创建项目

4 新建任务

  1. 登录 DataLeap租户控制台,并进入项目的 IDE 开发界面,进行新建任务操作。
  2. 单击新建任务按钮:
    1. 依次选择数据开发 > EMR Serverless Spark > EMR Serverless Spark Java/Python 任务类型。
    2. 关联实例:默认选项项目已绑定的 EMR Serverless Spark 实例。绑定详见创建项目
  3. 填写任务基本信息,输入任务名称信息,如:emr_serverless_spark_task,并选择任务存放的目标文件夹目录。单击确定按钮,完成任务创建。

注意

  1. 在项目控制台管理界面中,如果新增或修改了引擎,那么在数据开发任务新建窗口中,需刷新整个 DataLeap 数据开发界面,才能看到新增或修改后的引擎任务类型。
  2. 任务名称信息仅允许字符.、字母、数字、下划线、连字符、[]、【】、()、()以及中文字符,且需要在127个字符以内。

Image

5 任务配置说明

新建任务完成后,您可在任务配置界面完成以下参数配置。

5.1 语言设置

语言类型支持选择 Java、Python。

说明

语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。

5.2 引入资源

  • 语言类型选择 Java:
    EMR Serverless Spark Java 作业,将直接使用已上传到 TOS 的资源包,您可直接输入已上传的资源在 TOS 中的路径信息,如:tos://test-tos/emr-spark/tos-jar/spark-examples-2.3.4.jar。
    TOS 中上传文件操作详见文件上传
  • 语言类型选择 Python 时:
    • 资源类型默认选择 Python 类型。

    • 在编辑器中输入 Python 语句。若您的语句依赖其他 Python 文件或需要引用其他自定义 JAR 包时,您需要在编辑器中添加如下命令:

      set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]
      

      更多进阶用法,请参考 PySpark 开发指南

      注意

      • Python 执行引擎版本目前 DataLeap 中仅支持 Python3.7。
      • 若语句中需要设置系统环境变量时,需避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;

5.3 参数配置

参数

说明

Spark 参数

Main Class

语言类型选择 Java 时,需填写主类信息,如 org.apache.spark.examples.JavaSparkPi。

Conf参数

可配置任务中常见的一些 spark conf 参数,如常用的 Driver 内存分配参数spark.driver.memory、CPU 核心数spark.driver.cores等等。更多参数可参考:https://spark.apache.org/docs/latest/configuration.html
您可通过以下两种方式来进行配置:

  • 单行编辑模式:在对应输入框中,输入参数的 key-value 值。
  • 脚本编辑模式:支持 JSON、Yaml 的格式,直接用脚本方式进行配置参数。

Image

注意

EMR Serverless Spark Java/Python 任务访问 TOS 资源时,需进行 TOS 鉴权操作,详见 TOS 权限配置。因此您必须在 Conf 参数中增加有权限访问 TOS 存储桶的账号 AK/SK 信息。具体信息您可进入火山引擎,访问控制台的密钥管理界面,复制 Access key ID、 Secret Access Key 信息。如果是子用户,请联系主账号获取密钥。详见 AK 秘钥管理
单行编辑模式填写示例如下:

  • serverless.spark.access.key: xxxxxxx
  • serverless.spark.secret.key: xxxxxxx

其中秘钥 AK/SK 这类敏感信息,支持您以自定义项目参数形式,进行加密配置,Value 配置方式如{{AK}}、{{SK}}。具体加密操作详见参数信息设置

任务参数

自定义参数

输入任务中需要额外传递的参数,如数据输入/输出路径参数、数据处理的业务时间参数、过滤条件参数等等。
多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。支持配置时间参数,格式如 '${DATE}'、'${HOUR}';其他上游任务输入参数或项目参数格式如 {{param1}}。

5.4 任务产出数据登记

任务产出数据登记,用于记录任务、数据血缘信息,并不会对代码逻辑造成影响。
EMR Serverless Spark Java/Python 任务如果含有 LAS Catalog 库表数据的产出,则强烈建议在此手动登记任务产出数据表信息,以便后续维护任务数据血缘关系,下游任务也可通过表名信息,来添加上游依赖。
您手动填写的内容即为任务产出表,支持填写多个。其他下游任务依赖于此任务产出的数据表时,您可在下游任务的调度依赖中,通过依赖推荐手动添加的方式,将此处 EMR Serverless Spark Java/Python 任务所设置的产出库表名信息添加为依赖项。具体登记内容包括以下数据类型:

  • EMR Serverless Spark:该任务逻辑会将数据写入到 LAS Catalog 表,需填写 LAS Catalog 的数据库名、表名、分区名,分区内容可以使用变量,如 ${date}、${hour} 形式。
  • 其他:该任务逻辑不写数据到 LAS Catalog 表。

6 调度设置

任务配置完成后,在右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置调度属性、依赖、任务输入输出参数等信息。详细参数设置详见:调度设置
其中在调度基本信息 > 计算队列选择时,您可下拉选择项目控制台中已绑定的 Spark 队列类型,来执行 EMR Serverless Spark Java/Python 任务。

注意

Spark 队列需选择 Default 计算组常驻资源容量充足的队列来执行任务,EMR Serverless Spark Java/Python 任务不支持使用分配出去的 Spark SQL 或 Presto SQL 计算组资源容量,任务执行会异常。
若选择执行的 Spark 队列中包含部分 SQL 计算组资源时,需在上方的 Spark Conf 参数中,添加以下参数:tqs.query.engine.type = sparkcli,使其运行到剩余的 Default 常驻资源中。
Image

更多计算队列操作详见队列管理

7 调试运行并提交

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。
调试结果无误后,单击提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交

注意

  • 调试操作,直接使用线上数据进行调试,需谨慎操作。
  • 本任务类型暂不支持调试执行成功或失败后发送消息通知。
  • 如果租户主账号或项目管理员在项目控制台>流水线管理中启用了流水线流程校验,则您需要确保提交的任务符合流水线扩展程序的校验规则,才能成功提交。详见4 流水线管理

后续任务运维操作详见:离线任务运维

8 使用示例

8.1 估算圆周率示例

8.1.1 任务配置

以下示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Python 脚本来估算圆周率。

  1. 新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务

  2. 进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python
    Image

  3. 在代码编辑区域,编辑以下相关 Python 语句:

    import sys
    from random import random
    from operator import add
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        spark = SparkSession\
            .builder\
            .appName("PythonPi")\
            .getOrCreate()
    
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_: int) -> float:
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 <= 1 else 0
    
        count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
        print("Pi is roughly %f" % (4.0 * count / n))
    
        spark.stop()
    
  4. Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:

    • serverless.spark.access.key: xxxxxxx
    • serverless.spark.secret.key: xxxxxxx

8.1.2 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

8.1.3 查看日志输出结果

  1. 待任务执行成功后,您可在调试记录-概览界面,单击 TrackingURL 日志链接,进入 Spark 日志界面。
    Image
  2. 在 Spark 日志界面,单击 Executors 页签。
    Image
  3. Logs 列中,单击 stdout 按钮,进入查看任务执行结果。
    Image
    Image

8.2 查询 LAS Catalog 元数据示例

8.2.1 任务配置

该示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Pyspark 脚本来访问 LAS Catalog 中元数据。

  1. 新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务

  2. 进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python

  3. 在代码编辑区域,编辑以下相关 Python 语句:

    from pyspark.sql import SparkSession
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    df1 = spark.sql("insert into database.table_name partition (date='${date}') values(1,'testa',26),(2,'json',3345343)(2,'TOM',10)")
    df2 = spark.sql("select * from database.table_name where date='${date}';")
    df2.show()
    
    spark.stop()
    

    说明

    上方 Python 示例脚本中的 SQL 语句,您需替换成您自己的 SQL 语句,根据实际场景进行修改执行。

    Image

  4. 通过该方式访问 TOS 中的 LAS Catalog 元数据资源时,在 Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:

    • serverless.spark.access.key: xxxxxxx
    • serverless.spark.secret.key: xxxxxxx
      Image
      conf 参数说明详见4.3 参数配置

8.2.2 调试运行

任务配置完成后,您可单击操作栏中的保存调试按钮,进行任务调试。

8.2.3 查看日志输出结果

  1. 待任务执行成功后,您可在调试记录-日志界面,单击 TrackingURL 日志链接,进入 Spark 日志界面。
    Image
  2. 在 Spark 日志界面,单击 Executors 页签。
    Image
  3. Logs 列中,单击 stdout 按钮,进入查看任务执行结果。
    Image
    Image