为满足用户个性化数据查询分析的需求,EMR Serverless Spark支持用户编写Python,并提交PySpark作业。
PySpark作业使用队列中的通用资源,请检查队列中是否存在通用资源,详见:Spark Jar作业开发指南
EMR Serverless Spark 支持通过Set参数的方式,提交一个pySpark作业,极简demo如下:
进入创建作业界面:
将下述命令输入SQL编辑器中,并提交:
set serverless.spark.access.key = AKxxxx; set serverless.spark.secret.key = WV; set tqs.query.engine.type = sparkjar; set spark.jar.resource = tos://bucketname/path/pi.py;
pi.py中代码如下:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_: int) -> float: x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
您可以在Spark官方文档中查看更多的PySpark示例:PySpark Overview
该demo可以将您tos路径:tos://bucketname/path/pi.py 下的py代码文件作为您PySpark中的可执行Python代码,并提交至EMR Serverless Spark中。其中参数含义如下:
参数名称 | 参数值 |
---|---|
serverless.spark.access.key | 您访问LAS Formation库表以及TOS所需的Access Key,您可从IAM 访问密钥中拿到自己账户的AK |
serverless.spark.secret.key | 您访问LAS Formation库表以及TOS所需的Secret Key |
tqs.query.engine.type | 作业类型,这里固定为**sparkjar ** |
spark.jar.resource | 您的Python代码所存放的tos路径 |
提交后,您可以在SparkJar作业列表中,找到您的作业,并查看对应的日志和WebUI进行作业的运维:
除了命令方式提交,EMR Serverless Spark还支持使用SDK方式提交
如何快速入门Python SDK,您可详细参考:Python Query SDK
这里提供一个极简的demo:
def execute_spark_task(): from emr.client import ServerlessQueryClient from emr.auth import StaticCredentials from emr.resource import JarResourceInfo from emr.task import SparkJarTask # 访问您名下TOS桶以及LAS Formation所需的AK ak = 'your ak' # 访问您名下TOS桶以及LAS Formation所需的SK sk = 'your sk' # EMR Serverless Spark所在的Region region = 'cn-beijing' # 火山官方endpoint endpoint = 'open.volcengineapi.com' # 将您TOS中:tos://emr-dev/test/pi.py 文件作为可执行的Py代码,提交至PySpark client = ServerlessQueryClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) job = client.execute(task=SparkJarTask(name="Py Spark demo", jar=JarResourceInfo.of('tos://emr-dev/test/pi.py'), main_args=['arg_xxx', 'arg_yyy'] ), is_sync=False) def when_to_exit() -> bool: return job.get_tracking_url() is not None # 等待任务提交 job.wait_for(when_to_exit=when_to_exit, timeout=180) # 获取任务TrackingURL print('Tracking Url: %s' % job.get_tracking_url()) # 等待任务执行结束 job.wait_for_finished() print('The task executed successfully.') if __name__ == "__main__": execute_spark_task()
主执行Python文件可能还依赖其他的Python文件,或通用的Python lib
EMR ServerlessSpark支持通过命令的方式添加依赖的Python文件/Python依赖,在您的命令中,可以添加如下命令:
set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]
即可将tos路径下:tos://xx.zip 和 tos://xx.py 两个文件加载到PySpark任务中,并可在Python中通过import引用
参数详细解释如下:
参数名称 | 参数解释 |
---|---|
las.spark.jar.depend.pyFiles | py文件/lib 所在的tos路径,以json array的方式组织 |
依赖文件类型可以为PyFile
和Zip
两种。
PyFile
类型资源资源内容为一个Python脚本,在入口Python文件中引用该脚本时,作为module引入,module名称为文件名。
例:
依赖为start.py
时,在主文件引用方式为
from start import xxx
Zip
类型资源资源内容为一个zip包,在入口Python文件引用zip包中的内容时,可以直接将zip包中内容作为package来使用。
例:
zip包目录结构为:
core/start.py
core/end.py
引用方式为
from core.start import xxx from core.end import xxx
主执行Python文件需要读取TOS上的文件,但是不想引入TOS SDK,想直接在代码执行Classpath读到文件
EMR Serverless Spark支持通过命令添加自定义文件,您可以通过命令将tos上的文件添加到PySpark的Classpath中,并通过Python代码进行读取,在您的命令中,可以添加如下命令:
set las.spark.jar.depend.files = [{"fileName":"tos://xx.txt"}]
即可将tos路径下:tos://xx.txt 文件加载到PySpark任务中
参数详解:
参数名称 | 参数解释 |
---|---|
las.spark.jar.depend.files | 文件所在的tos路径,以json array的方式组织 |
添加后,您可以使用PySpark自带的API进行文件的读写
方法 | 参数 | 返回值 |
---|---|---|
SparkFiles.get('xxx') | String类型参数,xx为访问名称 | 返回文件的绝对路径 |
SparkFiles.getRootDirectory() | 返回文件所在父目录的绝对路径 |
通过PySpark Dataframe 访问其他数据源,例如访问火山ES、VeDB时,需要添加相应的connecto jar
下述demo表示通过Spark Dataframe读取mysql数据,此时就需要添加mysql jdbc 相关的Driver jar包
df = spark.read.format("jdbc") \ .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \ .option("dbtable", "xx") \ .option("user", MYSQL_USERNAME) \ .option("password", MYSQL_PASSWORD) \ .option("driver", "com.mysql.cj.jdbc.Driver") \ .option("partitionColumn", PARTITION_COL) \ .option("lowerBound", 1) \ .option("upperBound", PARTITION_UPPER_BOUND) \ .option("numPartitions", PARTITION_NUM) \ .load()
EMR Serverless Spark支持添加自定义的Jar包,您可以通过命令中添加如下参数添加自定义的Jar至任务Classpath中:
set las.spark.jar.depend.jars = [{"fileName":"tos://emr-dev/elasticsearch-spark-35_2.12-8.13.2.jar"}]
参数详解:
参数名称 | 参数解释 |
---|---|
las.spark.jar.depend.jars | 依赖Jar文件所在的tos路径,以json array的方式组织 |
有自定义Python版本需求(EMR Serverless Spark默认使用Python3.7版本),Python环境依赖复杂
EMR Serverless Spark 支持使用自定义的Python环境,您可以自定义您的PySpark运行时环境,包括但不限于自定义Python版本、自定义Python依赖等。
注意
python编译环境要求:X86_64
可以使用virtualenv或者conda构造自定义Python版本以及依赖环境。
注意
# 构造python版本为3.7.9的python环境 conda create -n python379 python=3.7.9 # 进入到该环境下 conda activate python379 # 安装pandas依赖 echo 'pandas' > requirements.txt pip install -r requirements.txt # 打包独立环境,产出zip包python379.zip cd ${conda_home}/envs/python379 && zip -r python379.zip * # 退出 conda deactivate
# 构造python版本为本地python3对应的python版本 virtualenv --python=$(which python3) --clear python379 # 进入到该环境下 source python379/bin/activate # 安装koalas依赖 echo 'koalas' > requirements.txt pip install -r requirements.txt # 打包独立环境,产出zip包python379.zip cd python379 && zip -r python379.zip * # 退出 deactivate
将python379.zip 上传至TOS中,这里路径假设为:tos://emr-dev/python379.zip
指定pyFiles依赖,在pyFiles里面添加该资源依赖。
set las.spark.jar.depend.pyFiles=[{"fileName":"tos://emr-dev/python379.zip"}]
set las.spark.jar.depend.archives=[{"fileName":"tos://emr-dev/python379.zip"}]
set spark.unpackUseCommand.enabled = true; set spark.pyspark.driver.python = python379/bin/python3; set spark.pyspark.python = python379.zip/bin/python3;
参数名称 | 参数值 | 参数解释 |
---|---|---|
spark.unpackUseCommand.enabled | true | 使用command方式解压zip包,防止数据权限丢失 |
spark.pyspark.driver.python | python379/bin/python3 | driver使用Python,前缀路径为文件名 |
spark.pyspark.python | python379.zip/bin/python3 | executor使用Python,前缀路径为文件名+.zip |