You need to enable JavaScript to run this app.
E-MapReduce

E-MapReduce

复制全文
Serverless Spark 作业
PySpark 作业
复制全文
PySpark 作业

为满足用户个性化数据查询分析的需求,EMR Serverless Spark支持用户编写Python,并提交PySpark作业。

前提条件
  • 已创建 EMR Serverless 队列,且拥有该计算组的 Developer/Admin 权限,具体操作可参考:创建资源队列
  • 已创建计算组,PySpark作业使用队列中的通用计算组。
  • 已将作业文件上传至当前 Serverless 队列同区域的 TOS 桶中。

提交作业

提交作业:通过控制台

  1. 登录 EMR Serverless 控制台,在左侧导航栏单击 作业中心 > 作业管理,进入作业管理页面后单击右上角的”创建作业“,进入作业创建页面。
    Image
    您也可以在 Serverless 的队列/计算组页面的右上角单击“创建作业”,跳转至创建作业页面。

  2. 在作业创建页面配置作业参数。

    1. 配置作业基本信息与资源配置。
      Image

      • 自定义作业名称,作业类型选择 PySpark。
      • 选择作业运行的队列资源,并选择开发模式:可选择 UI 或者 JSON。
    2. 配置作业内容,不同开发模式下的配置方式不一致,但是参数一致,以下为您介绍对应参数的配置说明。

      参数名称

      参数作用

      Python 文件

      上传至当前 Serverless 队列同区域的 TOS 桶中的作业文件。

      依赖 Jar

      支持添加一些依赖的 Jar 文件,在任务运行时会被同时添加至任务依赖中。同样,您可以将资源上传至 TOS 并在此指定路径

      依赖 Python 资源

      添加作业依赖的Python资源。

      依赖File 资源

      PySpar 作业的依赖文件,用户可以在任务代码中,通过 API 访问

      依赖archive

      PySpark 作业的依赖archive

      Spark 参数(Spark Conf)

      Spark 作业参数,可以指定 Spark 作业所用的资源等

      自定义参数(Main Conf)

      SparkJar 作业主类运行时,需要传入的参数

      存储挂载路径

      设置当前作业可能需要挂载的存储路径,详情请参见存储挂载概述

  3. 完成作业编辑后,单击右下角的 创建并运行 按钮,提交作业。
    您也可以仅创建,后续在作业列表页面再手动触发任务运行。

提交作业:通过 SDK

目前 EMR Serverless 支持 Java 和 Python 两类 SDK。Java SDK 使用方式请参考:Java Query SDK;Python SDK 使用方式请参考:Python Query SDK

提交作业:通过 Spark Submit

使用方式请参考文档:Spark Submit 工具使用说明

提交作业:Python SDK 方式

说明

  • 除了通过 Serverless 页面方式提交,EMR Serverless Spark 还支持使用 SDK 方式提交 PySpark 作业。
  • 快速入门 Python SDK,您可详细参考:Python Query SDK

此处提供一个极简的 PySpark 作业示例:

def execute_spark_task():
    from emr.client import ServerlessQueryClient
    from emr.auth  import StaticCredentials
    from emr.resource import JarResourceInfo
    from emr.task import SparkJarTask

    # 访问您名下TOS桶以及LAS Formation所需的AK
    ak = 'your ak'# 访问您名下TOS桶以及LAS Formation所需的SK
    sk = 'your sk'# EMR Serverless Spark所在的Region
    region = 'cn-beijing'# 火山官方endpoint
    endpoint = 'open.volcengineapi.com'# 将您TOS中:tos://bucketname/test/pi.py 文件作为可执行的Py代码,提交至PySpark
    client = ServerlessQueryClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)
    job = client.execute(task=SparkJarTask(name="Py Spark demo",
                   jar=JarResourceInfo.of('tos://bucketname/test/pi.py'),
                   main_args=['arg_xxx', 'arg_yyy']
                   ), is_sync=False)

    def when_to_exit() -> bool:
        return job.get_tracking_url() is not None# 等待任务提交
    job.wait_for(when_to_exit=when_to_exit, timeout=180)
    # 获取任务TrackingURLprint('Tracking Url: %s' % job.get_tracking_url())

    # 等待任务执行结束
    job.wait_for_finished()
    print('The task executed successfully.')

if __name__ == "__main__":
    execute_spark_task()

简单作业示例

如下以 test.py 作业为例:

  • 作业内容:
    SQL set serverless.spark.access.key = XXXX; set serverless.spark.secret.key = XXXX==; set tqs.query.engine.type = sparkjar; set spark.jar.resource = tos://{bucket name}/{path}/test.py;

    :::tip
    该 demo 可以将 TOS 路径:`tos://bucketname/path/test.py`下的 py 代码文件作为 PySpark 中的可执行 Python 代码,并提交至 Serverless Spark 中。其中参数含义参考如下:
    :::
    
    | | | \
    |参数名称 |参数值 |
    |---|---|
    | | | \
    |serverless.spark.access.key |您访问 LAS Catalog 库表以及 TOS 所需的 Access Key,您可从 IAM 访问密钥中拿到自己账户的 AK |
    | | | \
    |serverless.spark.secret.key |您访问 LAS Catalog 库表以及 TOS 所需的 SK |
    | | | \
    |tqs.query.engine.type |作业类型,这里固定为`sparkjar` |
    | | | \
    |spark.jar.resource |您的 Python 代码所存放的 TOS 路径 |
    
  • test.py 文件内容:
    ```Python
    import sys
    from random import random
    from operator import add

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        spark = SparkSession\
            .builder\
            .appName("PythonPi")\
            .getOrCreate()
    
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_: int) -> float:
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 <= 1 else 0
    
        count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
        print("Pi is roughly %f" % (4.0 * count / n))
    
        spark.stop()
    ```
    

    说明

    在 Spark 官方文档中可查看更多的 PySpark 示例:PySpark Overview

进阶操作

添加自定义 Python 文件/Python 依赖

适用场景

主执行 Python 文件可能还依赖其他的 Python 文件,或通用的 Python lib。

使用方法

EMR ServerlessSpark 支持通过命令的方式添加依赖的 Python 文件/Python 依赖,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.pyFiles=[{"fileName":"tos://xx.zip"}, {"fileName":"tos://xx.py"}]

即可将 tos 路径下:tos://xx.zip 和 tos://xx.py 两个文件加载到 PySpark 任务中,并可在 Python 中通过 import 引用。
参数详细解释如下:

参数名称

参数解释

las.spark.jar.depend.pyFiles

py 文件/lib 所在的 TOS 路径,以 json array 的方式组织

依赖文件类型可以为PyFileZip两种。

  • 依赖PyFile类型资源

资源内容为一个 Python 脚本,在入口 Python 文件中引用该脚本时,作为 module 引入,module 名称为文件名。
例:
依赖为start.py时,在主文件引用方式为:

from start import xxx
  • 依赖Zip类型资源

资源内容为一个 zip 包,在入口 Python 文件引用 zip 包中的内容时,可以直接将 zip 包中内容作为 package 来使用。
例:
zip 包目录结构为:

core/start.py
core/end.py

引用方式为:

from core.start import xxx
from core.end import xxx

添加自定义文件

适用场景

主执行 Python 文件需要读取 TOS 上的文件,但是不想引入 TOS SDK,想直接在代码执行 Classpath 读到文件。

使用方式

EMR Serverless Spark 支持通过命令添加自定义文件,您可以通过命令将 tos 上的文件添加到 PySpark 的 Classpath 中,并通过 Python 代码进行读取,在您的命令中,可以添加如下命令:

set las.spark.jar.depend.files = [{"fileName":"tos://xx.txt"}]

即可将 tos 路径下:tos://xx.txt 文件加载到 PySpark 任务中。
参数详解:

参数名称

参数解释

las.spark.jar.depend.files

文件所在的 tos 路径,以 json array 的方式组织

添加后,您可以使用 PySpark自带的API进行文件的读写

方法

参数

返回值

SparkFiles.get('xxx')

String 类型参数,xx为访问名称

返回文件的绝对路径

SparkFiles.getRootDirectory()

返回文件所在父目录的绝对路径

添加自定义 Jar

适用场景

通过 PySpark Dataframe 访问其他数据源,例如访问火山 ES、VeDB 时,需要添加相应的 connecto jar。

下述 demo 表示通过 Spark Dataframe 读取 mysql 数据,此时就需要添加 mysql jdbc 相关的 Driver jar 包。

df = spark.read.format("jdbc") \
        .option("url", f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}") \
        .option("dbtable", "xx") \
        .option("user", MYSQL_USERNAME) \
        .option("password", MYSQL_PASSWORD) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("partitionColumn", PARTITION_COL) \
        .option("lowerBound", 1) \
        .option("upperBound", PARTITION_UPPER_BOUND) \
        .option("numPartitions", PARTITION_NUM) \
        .load()

使用方式

EMR Serverless Spark 支持添加自定义的 Jar 包,您可以通过命令中添加如下参数添加自定义的 Jar 至任务 Classpath 中:

set las.spark.jar.depend.jars = [{"fileName":"tos://bucketname/elasticsearch-spark-35_2.12-8.13.2.jar"}]

参数详解:

参数名称

参数解释

las.spark.jar.depend.jars

依赖Jar文件所在的 tos 路径,以 json array 的方式组织

使用自定义的 Python 环境

适用场景

有自定义 Python 版本需求(EMR Serverless Spark 默认使用 Python3.7 版本),Python 环境依赖复杂
EMR Serverless Spark 支持使用自定义的 Python 环境,您可以自定义您的 PySpark 运行时环境,包括但不限于自定义Python 版本、自定义 Python 依赖等。

使用方式

注意

python编译环境要求:X86_64

可使用 virtualenv 或 conda 搭建自定义的 Python 版本及依赖环境。

注意

  • 复杂依赖建议使用 conda,python 环境更全
  • 有些依赖包依赖底层非 pip 安装的一些实现,这些包使用virtualenv是不能打包到独立的python环境中的。

构建独立 Python 环境
  • 使用 conda 构建独立 Python 环境
# 构造python版本为3.7.9的python环境
conda create -n python379 python=3.7.9
# 进入到该环境下
conda activate python379
# 安装pandas依赖
echo 'pandas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd ${conda_home}/envs/python379 && zip -r python379.zip *
# 退出
conda deactivate
  • 使用 virtualenv 构建独立 python 环境
# 构造python版本为本地python3对应的python版本
virtualenv --python=$(which python3) --clear python379
# 进入到该环境下
source python379/bin/activate
# 安装koalas依赖
echo 'koalas' > requirements.txt
pip install -r requirements.txt
# 打包独立环境,产出zip包python379.zip
cd python379 && zip -r python379.zip *
# 退出
deactivate

使用自定义 python 运行 pyspark
  • 将 python379.zip 上传至 TOS 中,这里路径假设为:tos://bucketname/python379.zip
  • 指定 archives 依赖,在 archives 中增加该资源依赖。
set las.spark.jar.depend.archives=[{"fileName":"tos://bucketname/python379.zip"}]
  • 设置 python 参数
set spark.unpackUseCommand.enabled = true;
set spark.pyspark.driver.python = python379.zip/bin/python3;
set spark.pyspark.python = python379.zip/bin/python3;

参数名称

参数值

参数解释

spark.unpackUseCommand.enabled

true

使用 command 方式解压 zip 包,防止数据权限丢失

spark.pyspark.driver.python

python379.zip/bin/python3

driver 使用 Python,前缀路径为文件名+.zip

spark.pyspark.python

python379.zip/bin/python3

executor 使用 Python,前缀路径为文件名+.zip

  • 在控制台提交任务,即可完成提交自定义 python 环境任务。

查看/运行/删除作业

作业创建完成后,您可以在作业列表页面查看所有已创建的作业,并可手动触发作业运行,或者对作业进行编辑修改、删除的操作。
Image

查看与诊断作业实例

在作业提交后,您也可以在 作业中心 > 作业实例 页面查看所有已提交的作业运行实例详情,包括运行状态、作业类型、资源详情、提交人等。
Image

  • 您可以在页面顶部通过实践范围、提交人等过滤条件快速筛选出待查看的作业实例,查看作业实例详情。
  • 对于运行失败的作业,您可单击“日志”,查看作业运行详细日志,进行作业失败原因定位;您也可以使用 “AI 诊断” 功能,进行作业智能诊断。
最近更新时间:2026.02.09 18:26:30
这个页面对您有帮助吗?
有用
有用
无用
无用