为满足用户个性化数据查询分析的需求,EMR Serverless Spark支持用户编写Python,并提交PySpark作业。
PySpark作业使用队列中的通用资源,请检查队列中是否存在通用资源,详见:Spark Jar作业开发指南。
登录 EMR Serverless 控制台,选择目标队列。
进入编辑作业页面,有如下两种方式:
方式一:
在作业编辑框的右上角,开发类型选择 SparkSQL。
在作业编辑框中进行 PySpark 作业的编辑,编辑完成后,可通过编辑框左下角的格式化按钮,对编辑中的作业进行规范化。如下以 test.py 作业为例:
作业内容:
set serverless.spark.access.key = XXXX; set serverless.spark.secret.key = XXXX==; set tqs.query.engine.type = sparkjar; set spark.jar.resource = tos://{bucket name}/{path}/test.py;
说明
该 demo 可以将 TOS 路径:tos://bucketname/path/test.py下的 py 代码文件作为 PySpark 中的可执行 Python 代码,并提交至 Serverless Spark 中。其中参数含义参考如下:
参数名称 | 参数值 |
|---|---|
serverless.spark.access.key | 您访问 LAS Catalog 库表以及 TOS 所需的 Access Key,您可从 IAM 访问密钥中拿到自己账户的 AK |
serverless.spark.secret.key | 您访问 LAS Catalog 库表以及 TOS 所需的 SK |
tqs.query.engine.type | 作业类型,这里固定为 |
spark.jar.resource | 您的 Python 代码所存放的 TOS 路径 |
test.py 文件内容:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
说明
在 Spark 官方文档中可查看更多的 PySpark 示例:PySpark Overview。
完成 PySpark 作业编辑后,您可以通过控制台右上角的队列和计算组来选择计算资源,完成后点击左下角的运行按钮,提交作业。
作业提交完成后,会弹出提交成功的标识,并在下方的查询日志中,会显示您本次提交的作业id,以及当前的任务状态,您可进一步在作业管理处查看详细任务情况。
说明
目前 EMR Serverless 支持 Java 和 Python 两类 SDK。Java SDK 使用方式请参考:Java Query SDK;Python SDK 使用方式请参考:Python Query SDK。
使用方式请参考文档:Spark Submit 工具使用说明。
主执行 Python 文件可能还依赖其他的 Python 文件,或通用的 Python lib。
EMR ServerlessSpark 支持通过命令的方式添加依赖的 Python 文件/Python 依赖,在您的命令中,可以添加如下命令:
set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]
即可将 tos 路径下:tos://xx.zip 和 tos://xx.py 两个文件加载到 PySpark 任务中,并可在 Python 中通过 import 引用。
参数详细解释如下:
参数名称 | 参数解释 |
|---|---|
las.spark.jar.depend.pyFiles | py 文件/lib 所在的 TOS 路径,以 json array 的方式组织 |
依赖文件类型可以为PyFile和Zip两种。
PyFile类型资源资源内容为一个 Python 脚本,在入口 Python 文件中引用该脚本时,作为 module 引入,module 名称为文件名。
例:
依赖为start.py时,在主文件引用方式为:
from start import xxx
Zip类型资源资源内容为一个 zip 包,在入口 Python 文件引用 zip 包中的内容时,可以直接将 zip 包中内容作为 package 来使用。
例:
zip 包目录结构为:
core/start.py
core/end.py
引用方式为:
from core.start import xxx from core.end import xxx
主执行 Python 文件需要读取 TOS 上的文件,但是不想引入 TOS SDK,想直接在代码执行 Classpath 读到文件。
EMR Serverless Spark 支持通过命令添加自定义文件,您可以通过命令将 tos 上的文件添加到 PySpark 的 Classpath 中,并通过 Python 代码进行读取,在您的命令中,可以添加如下命令:
set las.spark.jar.depend.files = [{"fileName":"tos://xx.txt"}]
即可将 tos 路径下:tos://xx.txt 文件加载到 PySpark 任务中。
参数详解:
参数名称 | 参数解释 |
|---|---|
las.spark.jar.depend.files | 文件所在的 tos 路径,以 json array 的方式组织 |
添加后,您可以使用 PySpark自带的API进行文件的读写
方法 | 参数 | 返回值 |
|---|---|---|
SparkFiles.get('xxx') | String 类型参数,xx为访问名称 | 返回文件的绝对路径 |
SparkFiles.getRootDirectory() | 返回文件所在父目录的绝对路径 |
通过 PySpark Dataframe 访问其他数据源,例如访问火山 ES、VeDB 时,需要添加相应的 connecto jar。
下述 demo 表示通过 Spark Dataframe 读取 mysql 数据,此时就需要添加 mysql jdbc 相关的 Driver jar 包。
df = spark.read.format("jdbc") \ .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \ .option("dbtable", "xx") \ .option("user", MYSQL_USERNAME) \ .option("password", MYSQL_PASSWORD) \ .option("driver", "com.mysql.cj.jdbc.Driver") \ .option("partitionColumn", PARTITION_COL) \ .option("lowerBound", 1) \ .option("upperBound", PARTITION_UPPER_BOUND) \ .option("numPartitions", PARTITION_NUM) \ .load()
EMR Serverless Spark 支持添加自定义的 Jar 包,您可以通过命令中添加如下参数添加自定义的 Jar 至任务 Classpath 中:
set las.spark.jar.depend.jars = [{"fileName":"tos://bucketname/elasticsearch-spark-35_2.12-8.13.2.jar"}]
参数详解:
参数名称 | 参数解释 |
|---|---|
las.spark.jar.depend.jars | 依赖Jar文件所在的 tos 路径,以 json array 的方式组织 |
有自定义 Python 版本需求(EMR Serverless Spark 默认使用 Python3.7 版本),Python 环境依赖复杂
EMR Serverless Spark 支持使用自定义的 Python 环境,您可以自定义您的 PySpark 运行时环境,包括但不限于自定义Python 版本、自定义 Python 依赖等。
注意
python编译环境要求:X86_64
可使用 virtualenv 或 conda 搭建自定义的 Python 版本及依赖环境。
注意
# 构造python版本为3.7.9的python环境 conda create -n python379 python=3.7.9 # 进入到该环境下 conda activate python379 # 安装pandas依赖 echo 'pandas' > requirements.txt pip install -r requirements.txt # 打包独立环境,产出zip包python379.zip cd ${conda_home}/envs/python379 && zip -r python379.zip * # 退出 conda deactivate
# 构造python版本为本地python3对应的python版本 virtualenv --python=$(which python3) --clear python379 # 进入到该环境下 source python379/bin/activate # 安装koalas依赖 echo 'koalas' > requirements.txt pip install -r requirements.txt # 打包独立环境,产出zip包python379.zip cd python379 && zip -r python379.zip * # 退出 deactivate
set las.spark.jar.depend.archives=[{"fileName":"tos://bucketname/python379.zip"}]
set spark.unpackUseCommand.enabled = true; set spark.pyspark.driver.python = python379.zip/bin/python3; set spark.pyspark.python = python379.zip/bin/python3;
参数名称 | 参数值 | 参数解释 |
|---|---|---|
spark.unpackUseCommand.enabled | true | 使用 command 方式解压 zip 包,防止数据权限丢失 |
spark.pyspark.driver.python | python379.zip/bin/python3 | driver 使用 Python,前缀路径为文件名+.zip |
spark.pyspark.python | python379.zip/bin/python3 | executor 使用 Python,前缀路径为文件名+.zip |
说明
此处提供一个极简的 PySpark 作业示例:
def execute_spark_task(): from emr.client import ServerlessQueryClient from emr.auth import StaticCredentials from emr.resource import JarResourceInfo from emr.task import SparkJarTask # 访问您名下TOS桶以及LAS Formation所需的AK ak = 'your ak'# 访问您名下TOS桶以及LAS Formation所需的SK sk = 'your sk'# EMR Serverless Spark所在的Region region = 'cn-beijing'# 火山官方endpoint endpoint = 'open.volcengineapi.com'# 将您TOS中:tos://bucketname/test/pi.py 文件作为可执行的Py代码,提交至PySpark client = ServerlessQueryClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) job = client.execute(task=SparkJarTask(name="Py Spark demo", jar=JarResourceInfo.of('tos://bucketname/test/pi.py'), main_args=['arg_xxx', 'arg_yyy'] ), is_sync=False) def when_to_exit() -> bool: return job.get_tracking_url() is not None# 等待任务提交 job.wait_for(when_to_exit=when_to_exit, timeout=180) # 获取任务TrackingURLprint('Tracking Url: %s' % job.get_tracking_url()) # 等待任务执行结束 job.wait_for_finished() print('The task executed successfully.') if __name__ == "__main__": execute_spark_task()