当前在 dolphnscheduler 3.1.9 的基础上,火山引擎EMR产品提供了 VolcanoEMRServerless 作业插件,支持提交以下类型的Serverless作业:SQL,Spark Jar,PySpark,方便您对接 EMR Serverless。
# 替换为上述下载包 unzip serverless-plugin.zip # 替换为实际dolphin用户目录 chown -R dolphinscheduler:dolphinscheduler serverless-plugin # 替换为实际的dolphin_scheduler master-server路径 cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/master-server/libs
# 替换为上述下载包 unzip serverless-plugin.zip # 替换为实际dolphin用户目录 chown -R dolphinscheduler:dolphinscheduler serverless-plugin # 替换为实际的dolphin_scheduler worker-server路径 cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/worker-server/libs
# 下载包 unzip serverless-plugin.zip # 替换为实际dolphin用户目录 chown -R dolphinscheduler:dolphinscheduler serverless-plugin # 替换为实际的dolphin_scheduler api-server路径 cp -rp serverless-plugin/. /usr/lib/emr/current/dolphin_scheduler/api-server/libs # 先对原始 ui 产物做备份 mv /usr/lib/emr/current/dolphin_scheduler/api-server/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui-bak # 放入 serverless 提供的 ui mv serverless-plugin/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui
在DolphinScheduler服务页面,单击“服务链接”进入DolphinScheduler控制台页面,完成上述作业插件的安装部署后,您可参考以下步骤,提交EMR Serverless任务。
提交EMR Serverless数据源前,您需要先创建一个数据源,为后续Serverless任务设置好运行时的队列、任务提交的地址和认证等信息。操作入口和关键参数如下。
参数 | 配置说明 |
|---|---|
数据源 | 选择: |
数据源名称 | 自定义数据源名称 |
地域 | 选择需要提交任务的地域。
|
访问地址 |
|
队列名称 | 设置为EMR Serverless队列的名称,后续Serverless任务会运行在此队列上,因此您需保障此处设置的队列为运行中的可用队列。 |
访问密钥AK & 访问秘钥SK | 设置为火山引擎账号的AK和SK。 |
完成数据源创建后,您可参见下文进行各类Serverless任务提交。
以下为提交SparkSQL任务的操作入口和主要参数说明。
在项目管理>工作流>工作流定义中创建工作流,工作流选择”VolcanoEMRServerless“任务节点。
在弹出的任务节点配置页面配置SparkSQL任务详情。
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
Spark SQL | 输入SparkSQL任务代码。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
提交SparkJar任务时,您需要先将任务代码文件上传至与Serverless队列同地域的火山引擎对象存储TOS中,并获取对应代码文件的TOS地址。
以下为提交SparkJar任务的操作入口和主要参数说明。
在项目管理>工作流>工作流定义中创建工作流,工作流选择”VolcanoEMRServerless“任务节点。
在弹出的任务节点配置页面配置SparkJar任务详情。
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
主函数的Class | 配置为任务代码文件中的主函数Class。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
提交SparkJar任务时,您需要先将任务代码文件上传至与Serverless队列同地域的火山引擎对象存储TOS中,并获取对应代码文件的TOS地址。
完成准备工作后,以下为各典型任务场景下的提交PySpark任务的操作入口和主要参数说明。
主程序:将任务代码主程序文件上传到TOS
以下为一个任务代码示例。
import argparse from pyspark.sql import SparkSession from pyspark import SparkFiles # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() parser = argparse.ArgumentParser(description='这是一个示例程序') parser.add_argument('--inputFile', help='输入文件') parser.add_argument('--outputFile', help='输出文件') args = parser.parse_args() inputFile = args.inputFile outputFile = args.outputFile # 打印参数的值 print('输入文件:', inputFile) print('输出文件:', outputFile) file_df = spark.sparkContext.textFile(inputFile) file_df_data = file_df.collect() SparkFiles.get(inputFile) print("inputFile data:") for line in file_df_data: print(line) # 创建示例数据 data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)] df = spark.createDataFrame(data, ["Name", "Age"]) # 将DataFrame保存为CSV文件 df.write.csv(outputFile, 'overwrite') spark.stop()
作业提交参数
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
配置参数 | 配置为后续PySpark任务进行数据处理时输入数据和输出数据的TOS路径。 |
当任务依赖自定义JAR时,参考下文进行任务配置。
主程序:上传到TOS
以下为一个任务代码示例。
from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() print("demo: ") spark.sql("select 1").show() spark.stop()
作业提交参数
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
依赖Jar文件 | 配置为依赖的Jar文件所在的TOS路径。 |
主程序:上传到TOS
from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() print("demo: ") spark.sql("select 1").show() spark.stop()
作业提交参数
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
配置参数 | 参考如下格式要求,配置为自定义镜像值。 |
主程序:上传到TOS
import time from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType from basic_statistics.file_info import getFile1 from basic_statistics.file_info import get_all_files from tools import func from module1 import my_module1 from module2 import my_module2 # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() get_all_files("./") getFile1("/") taxCut = udf(lambda salary: func.tax(salary), FloatType()) staff_data = [ {"name": "Alice", "salary": 70000}, {"name": "Bob", "salary": 60000}, {"name": "Charlie", "salary": 50000}, {"name": "David", "salary": 80000}, {"name": "Eve", "salary": 75000}, {"name": "Frank", "salary": 40000}, {"name": "Grace", "salary": 95000}, {"name": "Hannah", "salary": 30000}, ] df = spark.createDataFrame(staff_data) taxCut = udf(lambda salary: func.tax(salary), FloatType()) df.select("name", taxCut("salary").alias("final salary")).show() # 创建 DataFrame data = [(1,), (2,), (3,), (4,)] df = spark.createDataFrame(data, ["value"]) # 使用模块中的函数 result_multiply = df.rdd.map(lambda x: my_module1.multiply(x[0], 5)).collect() print("\n result_multiply is :", result_multiply) result_add = df.rdd.map(lambda x: my_module2.add(x[0], 10)).collect() print("\n result_add is :", result_add) spark.stop()
作业提交参数
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
配置参数 | 配置为任务运行的环境参数和运行参数取值。 |
依赖python文件 | 配置为依赖的Module文件的TOS存储路径地址。 |
主程序:上传到TOS
from pyspark.sql import SparkSession import numpy as np # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() print("demo: ") spark.sql("select 1").show() x = np.linspace(0, 2 * np.pi, 100) y = np.sin(x) print("x is ", x) print("y is ", y) spark.stop()
作业提交参数
参数 | 配置说明 |
|---|---|
节点名称 | 自定义任务节点名称。 |
数据源类型 & 数据源实例 |
|
任务名称 & 任务类型 |
|
主应用程序文件路径 | 配置为准备工作中任务代码文件的TOS路径地址。 |
计算组名称 | 配置为上述配置的数据源实例中指定的EMR Serverless队列中的某个可用的(运行中)计算组名称。 |
配置参数 | 配置为任务的运行参数取值和依赖的Conda文件的TOS路径地址。 |
以下以一个简单的工作流为例,为您示例自定义参数在上下游节点中的使用示例。
随软件栈版本不断升级,新版本会为您提供更多新的功能,如果希望使用更高版本中的新功能,历史版本的用户可参考以下步骤进行配置升级,完成后即可使用对应版本的新功能。
获取新版JAR包,并进行更新升级DolphinScheduler。
涉及前端产物升级,需要将旧版本的 UI 文件夹替换为新版 UI 文件夹,并对旧版本 UI 文件夹做备份,UI 文件夹地址
在 api-server 服务所在节点,对下列目录进行操作。
# 检查目录 /usr/lib/emr/current/dolphin_scheduler/api-server/ui # 备份目录 mv /usr/lib/emr/current/dolphin_scheduler/api-server/ui /usr/lib/emr/current/dolphin_scheduler/api-server/ui-bak # 更新目录 mv ${path_to_new_ui} /usr/lib/emr/current/dolphin_scheduler/api-server/ui
Serverless sdk 是实际上与 Serverless 交互的工具,相关依赖包为:serverless-sdk-query.jar,涉及到包更新时,把相应的包在以下路径进行替换:
在 master-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/master-server/libs | grep serverless-sdk
在 api-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/api-server/libs | grep serverless-sdk
在 worker-server 服务所在节点目录下,找到 serverless sdk jar 包位置,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/worker-server/libs | grep serverless-sdk
插件是实际上与 dolphinscheduler 交互的工具,相关依赖包为:
涉及到包更新时,把相应的包在以下路径进行替换:
在 master-server 服务所在节点目录下,找到要替换的包位,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/master-server/libs | grep volcanoemr
在 api-server 服务所在节点目录下,找到要替换的包位,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/api-server/libs | grep volcanoemr
在 worker-server 服务所在节点目录下,找到要替换的包位,进行替换
# 查看 /usr/lib/emr/current/dolphin_scheduler/worker-server/libs | grep volcanoemr
升级后,在业务低峰期对 dolpihinscheduelr 对应组件进行重启,使升级生效,可重启有变更的组件即可。