You need to enable JavaScript to run this app.
导航
PySpark 作业
最近更新时间:2025.10.29 20:56:44首次发布时间:2024.05.16 20:59:43
复制全文
我的收藏
有用
有用
无用
无用

简介

为满足用户个性化数据查询分析的需求,EMR Serverless Spark支持用户编写Python,并提交PySpark作业。
PySpark作业使用队列中的通用资源,请检查队列中是否存在通用资源,详见:Spark Jar作业开发指南

准备工作

  • 已创建 EMR Serverless 队列。 具体操作可参考:创建资源队列
  • 已创建计算组,且拥有该计算组的 Developer/Admin 权限。详细操作请参考:队列权限
  • 已将作业文件上传至当前 Serverless 队列同区域的 TOS 桶中。

操作步骤

通过控制台提交作业

  1. 登录 EMR Serverless 控制台,选择目标队列。

  2. 进入编辑作业页面,有如下两种方式:
    方式一

    1. 在队列详情页中,点击作业提交,进入编辑作业页面。
      方式二
    2. 在队列详情页中,点击计算组
    3. 进入计算组列表页面,选择并进入目标计算组,在计算组详情页的右上角,点击创建作业,进入作业编辑页面。
  3. 在作业编辑框的右上角,开发类型选择 SparkSQL。

  4. 在作业编辑框中进行 PySpark 作业的编辑,编辑完成后,可通过编辑框左下角的格式化按钮,对编辑中的作业进行规范化。如下以 test.py 作业为例:

    • 作业内容:

      set serverless.spark.access.key = XXXX;
      set serverless.spark.secret.key = XXXX==;
      set tqs.query.engine.type = sparkjar;
      set spark.jar.resource = tos://{bucket name}/{path}/test.py;
      

      说明

      该 demo 可以将 TOS 路径:tos://bucketname/path/test.py下的 py 代码文件作为 PySpark 中的可执行 Python 代码,并提交至 Serverless Spark 中。其中参数含义参考如下:

      参数名称

      参数值

      serverless.spark.access.key

      您访问 LAS Catalog 库表以及 TOS 所需的 Access Key,您可从 IAM 访问密钥中拿到自己账户的 AK

      serverless.spark.secret.key

      您访问 LAS Catalog 库表以及 TOS 所需的 SK

      tqs.query.engine.type

      作业类型,这里固定为sparkjar

      spark.jar.resource

      您的 Python 代码所存放的 TOS 路径

    • test.py 文件内容:

      import sys
      from random import random
      from operator import add
      
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
          """
              Usage: pi [partitions]
          """
          spark = SparkSession\
              .builder\
              .appName("PythonPi")\
              .getOrCreate()
      
          partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
          n = 100000 * partitions
      
          def f(_: int) -> float:
              x = random() * 2 - 1
              y = random() * 2 - 1
              return 1 if x ** 2 + y ** 2 <= 1 else 0
      
          count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
          print("Pi is roughly %f" % (4.0 * count / n))
      
          spark.stop()
      

    说明

    在 Spark 官方文档中可查看更多的 PySpark 示例:PySpark Overview

  5. 完成 PySpark 作业编辑后,您可以通过控制台右上角的队列计算组来选择计算资源,完成后点击左下角的运行按钮,提交作业。

  6. 作业提交完成后,会弹出提交成功的标识,并在下方的查询日志中,会显示您本次提交的作业id,以及当前的任务状态,您可进一步在作业管理处查看详细任务情况。

说明

  • 任务提交至 EMR Serverless 集群后,则会在查询日志中显示 Spark Web UI,你可点击打开链接,在 Web UI 详细查看当前任务的执行情况。任务执行成功后,则会为您显示当前查询的结果。
  • 控制台支持下载查询日志、查询结果及查询代码,文件默认以 csv 格式存储。
  • 控制台还可以在查询结果页签,点击放大镜按钮,输入关键字,搜索结果中的关键信息。
  • 历史记录页签中,可以查看当前队列历史作业的结果以及代码等。

通过 SDK 提交 PySpark 作业

目前 EMR Serverless 支持 Java 和 Python 两类 SDK。Java SDK 使用方式请参考:Java Query SDK;Python SDK 使用方式请参考:Python Query SDK

通过 Spark Submit 提交 PySpark 作业

使用方式请参考文档:Spark Submit 工具使用说明

进阶操作

添加自定义 Python 文件/Python 依赖

适用场景

主执行 Python 文件可能还依赖其他的 Python 文件,或通用的 Python lib。

使用方法

EMR ServerlessSpark 支持通过命令的方式添加依赖的 Python 文件/Python 依赖,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]

即可将 tos 路径下:tos://xx.zip 和 tos://xx.py 两个文件加载到 PySpark 任务中,并可在 Python 中通过 import 引用。
参数详细解释如下:

参数名称

参数解释

las.spark.jar.depend.pyFiles

py 文件/lib 所在的 TOS 路径,以 json array 的方式组织

依赖文件类型可以为PyFileZip两种。

  • 依赖PyFile类型资源

资源内容为一个 Python 脚本,在入口 Python 文件中引用该脚本时,作为 module 引入,module 名称为文件名。
例:
依赖为start.py时,在主文件引用方式为:

from start import xxx
  • 依赖Zip类型资源

资源内容为一个 zip 包,在入口 Python 文件引用 zip 包中的内容时,可以直接将 zip 包中内容作为 package 来使用。
例:
zip 包目录结构为:

core/start.py
core/end.py

引用方式为:

from core.start import xxx
from core.end import xxx

添加自定义文件

适用场景

主执行 Python 文件需要读取 TOS 上的文件,但是不想引入 TOS SDK,想直接在代码执行 Classpath 读到文件。

使用方式

EMR Serverless Spark 支持通过命令添加自定义文件,您可以通过命令将 tos 上的文件添加到 PySpark 的 Classpath 中,并通过 Python 代码进行读取,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.files = [{"fileName":"tos://xx.txt"}]

即可将 tos 路径下:tos://xx.txt 文件加载到 PySpark 任务中。
参数详解:

参数名称

参数解释

las.spark.jar.depend.files

文件所在的 tos 路径,以 json array 的方式组织

添加后,您可以使用 PySpark自带的API进行文件的读写

方法

参数

返回值

SparkFiles.get('xxx')

String 类型参数,xx为访问名称

返回文件的绝对路径

SparkFiles.getRootDirectory()

返回文件所在父目录的绝对路径

添加自定义 Jar

适用场景

通过 PySpark Dataframe 访问其他数据源,例如访问火山 ES、VeDB 时,需要添加相应的 connecto jar。

下述 demo 表示通过 Spark Dataframe 读取 mysql 数据,此时就需要添加 mysql jdbc 相关的 Driver jar 包。

df = spark.read.format("jdbc") \
        .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \
        .option("dbtable", "xx") \
        .option("user", MYSQL_USERNAME) \
        .option("password", MYSQL_PASSWORD) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("partitionColumn", PARTITION_COL) \
        .option("lowerBound", 1) \
        .option("upperBound", PARTITION_UPPER_BOUND) \
        .option("numPartitions", PARTITION_NUM) \
        .load()

使用方式

EMR Serverless Spark 支持添加自定义的 Jar 包,您可以通过命令中添加如下参数添加自定义的 Jar 至任务 Classpath 中:

set las.spark.jar.depend.jars = [{"fileName":"tos://bucketname/elasticsearch-spark-35_2.12-8.13.2.jar"}]

参数详解:

参数名称

参数解释

las.spark.jar.depend.jars

依赖Jar文件所在的 tos 路径,以 json array 的方式组织

使用自定义的 Python 环境

适用场景

有自定义 Python 版本需求(EMR Serverless Spark 默认使用 Python3.7 版本),Python 环境依赖复杂
EMR Serverless Spark 支持使用自定义的 Python 环境,您可以自定义您的 PySpark 运行时环境,包括但不限于自定义Python 版本、自定义 Python 依赖等。

使用方式

注意

python编译环境要求:X86_64

可使用 virtualenv 或 conda 搭建自定义的 Python 版本及依赖环境。

注意

  • 复杂依赖建议使用 conda,python 环境更全
  • 有些依赖包依赖底层非 pip 安装的一些实现,这些包使用virtualenv是不能打包到独立的python环境中的。

构建独立 Python 环境
  • 使用 conda 构建独立 Python 环境
# 构造python版本为3.7.9的python环境
conda create -n python379 python=3.7.9
# 进入到该环境下
conda activate python379
# 安装pandas依赖
echo 'pandas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd ${conda_home}/envs/python379 && zip -r python379.zip *
# 退出
conda deactivate
  • 使用 virtualenv 构建独立 python 环境
# 构造python版本为本地python3对应的python版本
virtualenv --python=$(which python3) --clear python379
# 进入到该环境下
source python379/bin/activate
# 安装koalas依赖
echo 'koalas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd python379 && zip -r python379.zip *
# 退出
deactivate

使用自定义 python 运行 pyspark
  • 将 python379.zip 上传至 TOS 中,这里路径假设为:tos://bucketname/python379.zip
  • 指定 archives 依赖,在 archives 中增加该资源依赖。
set las.spark.jar.depend.archives=[{"fileName":"tos://bucketname/python379.zip"}]
  • 设置 python 参数
set spark.unpackUseCommand.enabled = true;
set spark.pyspark.driver.python = python379.zip/bin/python3;
set spark.pyspark.python = python379.zip/bin/python3;

参数名称

参数值

参数解释

spark.unpackUseCommand.enabled

true

使用 command 方式解压 zip 包,防止数据权限丢失

spark.pyspark.driver.python

python379.zip/bin/python3

driver 使用 Python,前缀路径为文件名+.zip

spark.pyspark.python

python379.zip/bin/python3

executor 使用 Python,前缀路径为文件名+.zip

  • 在控制台提交任务,即可完成提交自定义 python 环境任务。

Python SDK 方式

说明

  • 除了通过 Serverless 页面方式提交,EMR Serverless Spark 还支持使用 SDK 方式提交 PySpark 作业。
  • 快速入门 Python SDK,您可详细参考:Python Query SDK

此处提供一个极简的 PySpark 作业示例:

def execute_spark_task():
    from emr.client import ServerlessQueryClient
    from emr.auth  import StaticCredentials
    from emr.resource import JarResourceInfo
    from emr.task import SparkJarTask

    # 访问您名下TOS桶以及LAS Formation所需的AK
    ak = 'your ak'# 访问您名下TOS桶以及LAS Formation所需的SK
    sk = 'your sk'# EMR Serverless Spark所在的Region
    region = 'cn-beijing'# 火山官方endpoint
    endpoint = 'open.volcengineapi.com'# 将您TOS中:tos://bucketname/test/pi.py 文件作为可执行的Py代码,提交至PySpark
    client = ServerlessQueryClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)
    job = client.execute(task=SparkJarTask(name="Py Spark demo",
                   jar=JarResourceInfo.of('tos://bucketname/test/pi.py'),
                   main_args=['arg_xxx', 'arg_yyy']
                   ), is_sync=False)

    def when_to_exit() -> bool:
        return job.get_tracking_url() is not None# 等待任务提交
    job.wait_for(when_to_exit=when_to_exit, timeout=180)
    # 获取任务TrackingURLprint('Tracking Url: %s' % job.get_tracking_url())

    # 等待任务执行结束
    job.wait_for_finished()
    print('The task executed successfully.')

if __name__ == "__main__":
    execute_spark_task()