DataLeap 支持与火山引擎 E - MapReduce(EMR)中的 Serverless Spark 实例对接。您可在本地编写 Java/Python 代码,将其上传至对象存储系统 TOS,随后在 DataLeap 里以可视化方式配置 Spark Java/Python 作业,引用资源并执行任务,从而满足业务场景中的数据开发需求。本文将为您介绍 DataLeap 中 EMR Serverless Spark Java/Python 任务的配置操作。
EMR Serverless Spark 是 EMR 框架下 Serverless 形态的数据产品,具备开箱即用且完全兼容开源 Spark 引擎的能力。它内置高可用的 Remote Shuffle Service(RSS),该服务允许 Shuffle Service在 Spark 之外运行,能够直接读写对象存储系统 TOS,实现存储与计算的解耦,从而提供更好的可用性和性能。
在业务场景涉及大规模机器学习和深度学习训练时,可借助 DataLeap 执行 Serverless Spark Java/Python 任务,来处理海量数据并进行复杂的计算。利用 EMR Serverless Spark 的高可用以及存算分离架构能力,能够加速模型训练进程,有效提高业务决策的准确性与效率。
注意
新建任务完成后,您可在任务配置界面完成以下参数配置。
语言类型支持选择 Java、Python。
说明
语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
资源类型默认选择 Python 类型。
在编辑器中输入 Python 语句。若您的语句依赖其他 Python 文件或需要引用其他自定义 JAR 包时,您需要在编辑器中添加如下命令:
set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]
更多进阶用法,请参考 PySpark 开发指南。
注意
PATH=$PATH:/home/lihua/apps/bin/
;参数 | 说明 |
---|---|
Spark 参数 | |
Main Class | 语言类型选择 Java 时,需填写主类信息,如 org.apache.spark.examples.JavaSparkPi。 |
Conf参数 | 可配置任务中常见的一些 spark conf 参数,如常用的 Driver 内存分配参数
注意 EMR Serverless Spark Java/Python 任务访问 TOS 资源时,需进行 TOS 鉴权操作,详见 TOS 权限配置。因此您必须在 Conf 参数中增加有权限访问 TOS 存储桶的账号 AK/SK 信息。具体信息您可进入火山引擎,访问控制台的密钥管理界面,复制 Access key ID、 Secret Access Key 信息。如果是子用户,请联系主账号获取密钥。详见 AK 秘钥管理。
其中秘钥 AK/SK 这类敏感信息,支持您以自定义项目参数形式,进行加密配置,Value 配置方式如{{AK}}、{{SK}}。具体加密操作详见参数信息设置。 |
任务参数 | |
自定义参数 | 输入任务中需要额外传递的参数,如数据输入/输出路径参数、数据处理的业务时间参数、过滤条件参数等等。
|
任务产出数据登记,用于记录任务、数据血缘信息,并不会对代码逻辑造成影响。
EMR Serverless Spark Java/Python 任务如果含有 LAS Catalog 库表数据的产出,则强烈建议在此手动登记任务产出数据表信息,以便后续维护任务数据血缘关系,下游任务也可通过表名信息,来添加上游依赖。
您手动填写的内容即为任务产出表,支持填写多个。其他下游任务依赖于此任务产出的数据表时,您可在下游任务的调度依赖中,通过依赖推荐或手动添加的方式,将此处 EMR Serverless Spark Java/Python 任务所设置的产出库表名信息添加为依赖项。具体登记内容包括以下数据类型:
任务配置完成后,在右侧导航栏中,单击调度配置按钮,进入调度配置窗口,您可以在此设置调度属性、依赖、任务输入输出参数等信息。详细参数设置详见:调度设置。
其中在调度基本信息 > 计算队列选择时,您可下拉选择项目控制台中已绑定的 Spark 队列类型,来执行 EMR Serverless Spark Java/Python 任务。
注意
Spark 队列需选择 Default 计算组中常驻资源容量充足的队列来执行任务,EMR Serverless Spark Java/Python 任务不支持使用分配出去的 Spark SQL 或 Presto SQL 计算组资源容量,任务执行会异常。
若选择执行的 Spark 队列中包含部分 SQL 计算组资源时,需在上方的 Spark Conf 参数中,添加以下参数:tqs.query.engine.type = sparkcli,使其运行到剩余的 Default 常驻资源中。
更多计算队列操作详见队列管理。
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
调试结果无误后,单击提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交。
注意
后续任务运维操作详见:离线任务运维。
以下示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Python 脚本来估算圆周率。
新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务。
进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python。
在代码编辑区域,编辑以下相关 Python 语句:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
该示例将为您演示通过 EMR Serverless Spark Python 任务中 Python 语言方式,执行一段 Pyspark 脚本来访问 LAS Catalog 中元数据。
新建 EMR Serverless Spark Java/Python 任务,详见上方3 新建任务。
进入任务配置界面,语言类型选择 Python,引入资源类型选择 Python。
在代码编辑区域,编辑以下相关 Python 语句:
from pyspark.sql import SparkSession # 创建SparkSession对象 spark = SparkSession.builder \ .enableHiveSupport() \ .getOrCreate() df1 = spark.sql("insert into database.table_name partition (date='${date}') values(1,'testa',26),(2,'json',3345343)(2,'TOM',10)") df2 = spark.sql("select * from database.table_name where date='${date}';") df2.show() spark.stop()
说明
上方 Python 示例脚本中的 SQL 语句,您需替换成您自己的 SQL 语句,根据实际场景进行修改执行。
通过该方式访问 TOS 中的 LAS Catalog 元数据资源时,在 Python 脚本编辑完成后,您还需在下方添加以下 Conf 参数,来通过 TOS 的鉴权:
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。