You need to enable JavaScript to run this app.
E-MapReduce

E-MapReduce

复制全文
DolphinScheduler 对接 EMR Serverless
使用作业插件(Task Plugin)
复制全文
使用作业插件(Task Plugin)

适配说明

当前在 dolphnscheduler 3.1.9 的基础上,火山引擎EMR产品提供了 VolcanoEMRServerless 作业插件,支持提交以下类型的Serverless作业:SQL,Spark Jar,PySpark,方便您对接 EMR Serverless。

Dolphin安装部署

serverless-plugin插件下载

serverless-plugin.zip
未知大小

更新Dolphin master-server

# 替换为上述下载包
unzip serverless-plugin.zip

# 替换为实际dolphin用户目录
chown -R dolphinscheduler:dolphinscheduler serverless-plugin
# 替换为实际的dolphin_scheduler master-server路径
cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/master-server/libs

更新Dolphin worker-server

# 替换为上述下载包
unzip serverless-plugin.zip

# 替换为实际dolphin用户目录
chown -R dolphinscheduler:dolphinscheduler serverless-plugin

# 替换为实际的dolphin_scheduler worker-server路径
cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/worker-server/libs

更新Dolphin api-server

# 下载包
unzip serverless-plugin.zip

# 替换为实际dolphin用户目录
chown -R dolphinscheduler:dolphinscheduler serverless-plugin

# 替换为实际的dolphin_scheduler api-server路径
cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/api-server/libs

# 先对原始 ui 产物做备份
mv /usr/lib/emr/current/dolphin_scheduler/api-server/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui-bak
# 放入 serverless 提供的 ui
mv serverless-plugin/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui

提交 EMR Serverless任务

在DolphinScheduler服务页面,单击“服务链接”进入DolphinScheduler控制台页面,完成上述作业插件的安装部署后,您可参考以下步骤,提交EMR Serverless任务。

初始化:创建数据源

提交EMR Serverless数据源前,您需要先创建一个数据源,为后续Serverless任务设置好运行时的队列、任务提交的地址和认证等信息。操作入口和关键参数如下。
Image

参数

配置说明

数据源

选择:VOLCANO_EMR_SERVERLESS

数据源名称

自定义数据源名称

地域

选择需要提交任务的地域。

  • 北京:cn-beijing
  • 华东:cn-shanghai
  • 华南:cn-guangzhou
  • 柔佛:ap-southeast-1

访问地址

  • 设置为:https://open.volcengineapi.com

队列名称

设置为EMR Serverless队列的名称,后续Serverless任务会运行在此队列上,因此您需保障此处设置的队列为运行中的可用队列。

访问密钥AK & 访问秘钥SK

设置为火山引擎账号的AK和SK。

完成数据源创建后,您可参见下文进行各类Serverless任务提交。

提交SparkSQL任务

以下为提交SparkSQL任务的操作入口和主要参数说明。

  1. 在项目管理>工作流>工作流定义中创建工作流,工作流选择”VolcanoEMRServerless“任务节点。
    Image

  2. 在弹出的任务节点配置页面配置SparkSQL任务详情。

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”SparkSQL“。

    Spark SQL

    输入SparkSQL任务代码。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

提交SparkJar任务

准备工作:准备任务资源文件

提交SparkJar任务时,您需要先将任务代码文件上传至与Serverless队列同地域的火山引擎对象存储TOS中,并获取对应代码文件的TOS地址。

提交任务

以下为提交SparkJar任务的操作入口和主要参数说明。

  1. 在项目管理>工作流>工作流定义中创建工作流,工作流选择”VolcanoEMRServerless“任务节点。

  2. 在弹出的任务节点配置页面配置SparkJar任务详情。
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”SparkJar“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    主函数的Class

    配置为任务代码文件中的主函数Class。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

提交PySpark任务

准备工作:准备任务资源文件

提交SparkJar任务时,您需要先将任务代码文件上传至与Serverless队列同地域的火山引擎对象存储TOS中,并获取对应代码文件的TOS地址。
完成准备工作后,以下为各典型任务场景下的提交PySpark任务的操作入口和主要参数说明。

任务场景1:访问TOS提交任务

  • 主程序:将任务代码主程序文件上传到TOS
    以下为一个任务代码示例。

    import argparse
    from pyspark.sql import SparkSession
    from pyspark import SparkFiles
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    parser = argparse.ArgumentParser(description='这是一个示例程序')
    parser.add_argument('--inputFile', help='输入文件')
    parser.add_argument('--outputFile', help='输出文件')
    args = parser.parse_args()
    inputFile = args.inputFile
    outputFile = args.outputFile
    # 打印参数的值
    print('输入文件:', inputFile)
    print('输出文件:', outputFile)
    
    file_df = spark.sparkContext.textFile(inputFile)
    file_df_data = file_df.collect()
    
    SparkFiles.get(inputFile)
    print("inputFile data:")
    for line in file_df_data:
        print(line)
    
    # 创建示例数据
    data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
    df = spark.createDataFrame(data, ["Name", "Age"])
    # 将DataFrame保存为CSV文件
    df.write.csv(outputFile, 'overwrite')
    
    spark.stop()
    
  • 作业提交参数
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”PySpark“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

    配置参数

    配置为后续PySpark任务进行数据处理时输入数据和输出数据的TOS路径。
    例如:--inputFile;tos://lei-hns/demo/data/data_01.csv;--outputFile;tos://lei-hns/demo/data/data_output.csv

任务场景2:依赖自定义Jar

当任务依赖自定义JAR时,参考下文进行任务配置。

  • 主程序:上传到TOS
    以下为一个任务代码示例。

    from pyspark.sql import SparkSession
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    print("demo: ")
    spark.sql("select 1").show()
    
    spark.stop()
    
  • 作业提交参数
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”PySpark“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

    依赖Jar文件

    配置为依赖的Jar文件所在的TOS路径。
    例如:tos://doctest/serverless/spark/pyspark/spark-jar-demo.jar

任务场景3:依赖自定义镜像

  • 主程序:上传到TOS

    from pyspark.sql import SparkSession
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    print("demo: ")
    spark.sql("select 1").show()
    
    spark.stop()
    
  • 作业提交参数
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”PySpark“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

    配置参数

    参考如下格式要求,配置为自定义镜像值。
    spark.kubernetes.container.image=实际镜像值

任务场景4:依赖自定义Module

  • 主程序:上传到TOS

    import time
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    from basic_statistics.file_info import getFile1
    from basic_statistics.file_info import get_all_files
    from tools import func
    from module1 import my_module1
    from module2 import my_module2
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    get_all_files("./")
    getFile1("/")
    
    taxCut = udf(lambda salary: func.tax(salary), FloatType())
    
    staff_data = [
        {"name": "Alice", "salary": 70000},
        {"name": "Bob", "salary": 60000},
        {"name": "Charlie", "salary": 50000},
        {"name": "David", "salary": 80000},
        {"name": "Eve", "salary": 75000},
        {"name": "Frank", "salary": 40000},
        {"name": "Grace", "salary": 95000},
        {"name": "Hannah", "salary": 30000},
    ]
    df = spark.createDataFrame(staff_data)
    
    taxCut = udf(lambda salary: func.tax(salary), FloatType())
    df.select("name", taxCut("salary").alias("final salary")).show()
    
    # 创建 DataFrame
    data = [(1,), (2,), (3,), (4,)]
    df = spark.createDataFrame(data, ["value"])
    
    # 使用模块中的函数
    result_multiply = df.rdd.map(lambda x: my_module1.multiply(x[0], 5)).collect()
    print("\n result_multiply is :", result_multiply)
    result_add = df.rdd.map(lambda x: my_module2.add(x[0], 10)).collect()
    print("\n result_add is :", result_add)
    
    spark.stop()
    
  • 作业提交参数
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”PySpark“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

    配置参数

    配置为任务运行的环境参数和运行参数取值。
    例如:spark.executorEnv.PYTHONPATH=/opt/spark/work-dir/module1.zip:/opt/spark/work-dir/module2.zip:/opt/spark/work-dir/basic_statistics.zip:/opt/spark/work-dir/tools.zip;spark.driver.cores=2;spark.driver.maxResultSize=10g

    依赖python文件

    配置为依赖的Module文件的TOS存储路径地址。
    例如,tos://lei-hns/demo/spark/python/basic_statistics.zip;tos://lei-hns/demo/spark/python/tools.zip;tos://lei-hns/demo/spark/python/module1.zip;tos://lei-hns/demo/spark/python/module2.zip;

任务场景5:依赖自定义Conda

  • 主程序:上传到TOS

    from pyspark.sql import SparkSession
    import numpy as np
    
    # 创建SparkSession对象
    spark = SparkSession.builder \
        .enableHiveSupport() \
        .getOrCreate()
    
    print("demo: ")
    spark.sql("select 1").show()
    
    x = np.linspace(0, 2 * np.pi, 100)
    y = np.sin(x)
    print("x is ", x)
    print("y is ", y)
    
    spark.stop()
    
  • 作业提交参数
    Image

    参数

    配置说明

    节点名称

    自定义任务节点名称。

    数据源类型 & 数据源实例

    • 数据源类型选择”VOLCANO_EMR_SERVERLESS“。
    • 数据源实例选择上述步骤中已创建好的数据源。

    任务名称 & 任务类型

    • 自定义任务名称。
    • 任务类型选择”PySpark“。

    主应用程序文件路径

    配置为准备工作中任务代码文件的TOS路径地址。

    计算组名称

    配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。
    Image

    配置参数

    配置为任务的运行参数取值和依赖的Conda文件的TOS路径地址。
    例如:spark.unpackUseCommand.enabled=true;las.spark.jar.depend.archives=[{"fileName": "tos://lei-hns/demo/conda/python38_new.zip"}];spark.pyspark.python=python38_new.zip/python38_new/bin/python3;spark.pyspark.driver.python=python38_new.zip/python38_new/bin/python3;

高级应用实践

使用用户自定义参数

功能说明

  • EMR软件栈自3.20.0版本开始,支持使用EMR Serverless 作业插件提交 Serverless 任务时使用用户自定义变量参数。
    Image
    如上图所示,您可以在配置任务参数时,通过”自定义参数“按需添加。
  • 注意:当前自定义参数功能暂不支持配置OUT类型的参数。
  • 历史版本的用户如果希望使用自定义参数功能,可参考下文 历史版本升级 章节进行升级。

应用示例

以下以一个简单的工作流为例,为您示例自定义参数在上下游节点中的使用示例。

  • 示例场景
    Image
    本实例场景中,当前支持将上游任务节点的自定义参数传递给下游SparkSQL任务节点,暂不支持将SparkSQL任务节点的自定义参数传递至下游任务节点。
  • 任务节点配置
    • 上游shell任务
      Image
    • 中间Spark任务
      Image
    • 下游shell任务
      Image

历史版本升级

随软件栈版本不断升级,新版本会为您提供更多新的功能,如果希望使用更高版本中的新功能,历史版本的用户可参考以下步骤进行配置升级,完成后即可使用对应版本的新功能。

获取新版JAR包

获取新版JAR包,并进行更新升级DolphinScheduler。

update.tar.gz
未知大小

升级前端产物

涉及前端产物升级,需要将旧版本的 UI 文件夹替换为新版 UI 文件夹,并对旧版本 UI 文件夹做备份,UI 文件夹地址
在 api-server 服务所在节点,对下列目录进行操作。

# 检查目录
/usr/lib/emr/current/dolphin_scheduler/api-server/ui
# 备份目录
mv /usr/lib/emr/current/dolphin_scheduler/api-server/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui-bak
# 更新目录
mv ${path_to_new_ui} /usr/lib/emr/current/dolphin_scheduler/api-server/ui

升级 Serverless SDK

Serverless sdk 是实际上与 Serverless 交互的工具,相关依赖包为:serverless-sdk-query.jar,涉及到包更新时,把相应的包在以下路径进行替换:

  1. 在 master-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/master-server/libs | grep serverless-sdk
    
  2. 在 api-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/api-server/libs | grep serverless-sdk
    
  3. 在 worker-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/worker-server/libs | grep serverless-sdk
    

升级插件

插件是实际上与 dolphinscheduler 交互的工具,相关依赖包为:

  • dolphinscheduler-task-volcanoemrserverless-3.1.9.jar(任务相关配置)
  • dolphinscheduler-datasource-volcanoemrserverless-3.1.9.jar(数据源相关配置)
  • dolphinscheduler-spi-3.1.9.jar(基本不会改动)

涉及到包更新时,把相应的包在以下路径进行替换:

  1. 在 master-server 服务所在节点目录下,找到要替换的包位,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/master-server/libs | grep volcanoemr
    
  2. 在 api-server 服务所在节点目录下,找到要替换的包位,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/api-server/libs | grep volcanoemr
    
  3. 在 worker-server 服务所在节点目录下,找到要替换的包位,进行替换

    # 查看
    /usr/lib/emr/current/dolphin_scheduler/worker-server/libs | grep volcanoemr
    

重启使升级生效

升级后,在业务低峰期对 dolpihinscheduelr 对应组件进行重启,使升级生效,可重启有变更的组件即可。

最近更新时间:2026.02.09 20:44:58
这个页面对您有帮助吗?
有用
有用
无用
无用