You need to enable JavaScript to run this app.
导航
Python Query SDK
最近更新时间:2024.07.18 16:50:29首次发布时间:2024.05.16 21:00:28

1 简介

Python Query SDK 帮助 Serverless Spark 用户更加轻松地通过 Python 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。

2 概念说明

系统概念

  • Endpoint:表示 Serverless Spark 对外服务的 API 域名
  • Region:表示 Serverless Spark 的数据中心所在的物理区域

目前 Serverless Spark 支持的地域和 API 域名如下表所示:

Region(中文名称)

Region

Endpoint

华北-北京

cn-beijing

open.volcengineapi.com

华东-上海

cn-shanghai

open.volcengineapi.com

华北-广州

cn-guangzhou

open.volcengineapi.com

亚太东南-柔佛

ap-southeast-1

open.volcengineapi.com

  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的“密钥管理”页面获取到 Access Key 和 Secret Access Key。

内部概念

  • Schema:一个可以包含 数据表、资源、UDF 等的集合空间概念
  • Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息
  • Job:表示某次 Task 执行生成的任务实例
  • Result:表示某次 Job 的运行结果
  • ResultSchema:运行结果的 Schema 信息
  • Record:表示运行结果的结果集中的一行记录

安装 SDK
要求:Python 3.6+

python_serverless-1.0.0-py3-none-any.whl.zip
29.29KB

直接使用 wheel 安装:

$ unzip python_serverless-1.0.0-py3-none-any.whl.zip
$ pip3 install python_serverless-1.0.0-py3-none-any.whl

3 快速入门

3.1 初始化客户端

Python Query SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

from serverless.auth import StaticCredentials
from serverless.client import ServerlessClient

ak = 'your ak'
sk = 'your sk'
region = 'cn-beijing'
endpoint = 'open.volcengineapi.com'
service = 'emr_serverless'
connection_timeout = 30
socket_timeout = 30

client = ServerlessClient(credentials=StaticCredentials(ak, sk), 
    region=region, endpoint=endpoint, service=service,
    connection_timeout=connection_timeout, 
    socket_timeout=socket_timeout)

ServerlessClient 客户端是后续调用各种 Serverless Spark 功能的入口,当前版本 ServerlessClient 提供如下 API 接口:

API

功能

execute

执行作业

cancel_job

取消任务

get_job

获取任务实例状态

get_result

获取作业结果

3.2 第一个查询

初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:

sql = """
    SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100
"""

# 同步执行查询
job = client.execute(task=SQLTask(name="first query task", 
                            query=sql,
                            conf={}),
                    is_sync=True)

# 获取查询结果
if job.is_success():
    result = job.get_result()
    for record in result:
        print(', '.join([col for col in record]))

4 更多示例

本节将以代码示例的形式展示更多 Serverless Spark 功能的使用方式。

4.1 提交 SQL 任务

SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下三个参数:

参数

类型

是否必须

描述

query

str

Y

sql 语句

name

str

N

任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成

conf

dict

N

用于指定任务参数;默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

示例:

def execute_sql_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import SQLTask
    from serverless.exceptions import QuerySdkError
    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'
    sql = '${CUSTOM_SQL_STATEMEMT}'
    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region,endpoint=endpoint)
    try:
        job = client.execute(task=SQLTask(name='sql task', query=sql, conf={}), is_sync=True)
        if job.is_success():
            result = job.get_result()
            for record in result:
                print(', '.join([col for col in record]))
    except QuerySdkError as e:
        print(
            "Error in executing sql task. code = %s, error = %s" % (
            e.error_code, e.info))
if __name__ == "__main__":
    execute_sql_task()

4.2 提交 SparkJar 任务

SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。 JarTask 是 SDK 提供 SparkJar 任务执行的接口:

参数

类型

是否必须

描述

jar

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如['tos://bucket/path/to/jar']

main_class

str

Y

Spark application 的 main class

main_args

list

N

spark application 的 main function 参数;不传默认为 empty list

name

str

N

任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数;默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如['tos://bucket/path/to/jar']

files

[]str

N

依赖的文件,对应spark-submit的--files选项, ,例如['tos://bucket/path/to/file']

archives

[]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如['tos://bucket/path/to/archive']

示例:

def execute_spark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import JarTask

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    jar_resource = 'tos://bucket/path/to/jar'
    main_class = 'com.xxx.xxx'
    main_args = ['arg_xxx', 'arg_yyy']
    job = client.execute(task=JarTask(name="spark task test",
        jar=jar_resource,
        main_class=main_class,
        main_args=main_args,
        conf={'serverless.spark.access.key': '${ak}', 'serverless.spark.secret.key': '${sk}'}
),
    is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())

if __name__ == "__main__":
    execute_spark_task()

4.3 提交 PySpark 任务

Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 Py Spark作业开发指南。 PySparkTask 是 SDK 提供 PySpark 任务执行的接口:

参数

类型

是否必须

描述

script

str

Y

任务执行时使用的 SparkJar 资源,需传入tos路径,例如['tos://bucket/path/to/pyfile']

args

list

N

spark application 的 main function 参数;不传默认为 empty list

name

str

N

任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成

conf

dict

N

用于指定任务参数;默认为空

queue

str

N

指定运行队列名,不填则将选用公共队列

depend_jars

[]str

N

依赖的jar文件,对应spark-submit的--jars选项,例如['tos://bucket/path/to/jar']

files

[]str

N

依赖的文件,对应spark-submit的--files选项, ,例如['tos://bucket/path/to/file']

archives

[]str

N

依赖的archive文件,对应spark-submit的--archieves选项, ,例如['tos://bucket/path/to/archive']

pyfiles

[]str

N

依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如['tos://bucket/path/to/pyfile']

示例:

def execute_pyspark_task():
    from serverless.client import ServerlessClient
    from serverless.auth import StaticCredentials
    from serverless.task import PySparkTask

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    service = 'emr_serverless'
    endpoint = 'open.volcengineapi.com'

    client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint)

    pyscript_resource = 'tos://bucket/path/to/pyfile'
    main_args = ['arg_xxx', 'arg_yyy']
    files = ['tos://bucket/path/to/dependency_pyfile']
    task = PySparkTask(name='pyspark task test',
        # queue='${queue_name}',
        script=pyscript_resource,
        args=main_args,
        conf={'serverless.spark.access.key': '${tos_ak}',
            'serverless.spark.secret.key': '${tos_sk}'},
        files=files
    )
        
    job = client.execute(task=task, is_sync=True)

    print('The task executed successfully.')
    print('Tracking ui: %s' % job.get_tracking_url())

if __name__ == "__main__":
    execute_pyspark_task()

4.4 同步/异步执行

通过is_sync 参数进行控制:

参数

类型

是否必须

描述

task

Task

Y

需要执行的任务

is_sync

bool

Y

是否同步执行

timeout

int

N

同步执行的超时时间,单位 s

# 异步
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False)

# 同步执行,3 分钟超时
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)

4.5 取消任务

# 取消任务可以通过 job 实例;也可以通过 ServerlessClient 进行取消
job.cancel()
client.cancel_job(job)

5 查看任务实例相关信息

5.1 获取任务实例

可以根据任务 ID 进行任务实例的获取:

job = client.get_job(jobId)

5.2 获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI 页面:

job.get_tracking_url()

5.3 等待任务

异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:

# 等待任务结束
job.wait_for_finished()

# 自定义结束状态
def when_to_exit() -> bool:
    return job.get_tracking_url() is not None

job.wait_for(when_to_exit=_when_to_exit, timeout=180)

6 获取查询结果

对job实例调用 get_result() 获取任务的查询结果:

result = job.get_result()

# 或者由 serverless client 侧获取
result = client.get_result(job)

for record in result:
    print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))

7 执行异常

任务异常将会以 QuerySdkError 的形式进行抛出,exception message 内携带具体的执行错误信息。