Python Query SDK 帮助 Serverless Spark 用户更加轻松地通过 Python 语言使用 Serverless Spark 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。
系统概念
目前 Serverless Spark 支持的地域和 API 域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | open.volcengineapi.com |
华东-上海 | cn-shanghai | open.volcengineapi.com |
华北-广州 | cn-guangzhou | open.volcengineapi.com |
亚太东南-柔佛 | ap-southeast-1 | open.volcengineapi.com |
内部概念
安装 SDK
要求:Python 3.6+
直接使用 wheel 安装:
$ unzip python_serverless-1.0.0-py3-none-any.whl.zip $ pip3 install python_serverless-1.0.0-py3-none-any.whl
Python Query SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
from serverless.auth import StaticCredentials from serverless.client import ServerlessClient ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'open.volcengineapi.com' service = 'emr_serverless' connection_timeout = 30 socket_timeout = 30 client = ServerlessClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint, service=service, connection_timeout=connection_timeout, socket_timeout=socket_timeout)
ServerlessClient
客户端是后续调用各种 Serverless Spark 功能的入口,当前版本 ServerlessClient 提供如下 API 接口:
API | 功能 |
---|---|
execute | 执行作业 |
cancel_job | 取消任务 |
get_job | 获取任务实例状态 |
get_result | 获取作业结果 |
初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:
sql = """ SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100 """ # 同步执行查询 job = client.execute(task=SQLTask(name="first query task", query=sql, conf={}), is_sync=True) # 获取查询结果 if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record]))
本节将以代码示例的形式展示更多 Serverless Spark 功能的使用方式。
SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下三个参数:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
query | str | Y | sql 语句 |
name | str | N | 任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成 |
conf | dict | N | 用于指定任务参数;默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
示例:
def execute_sql_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import SQLTask from serverless.exceptions import QuerySdkError ak = 'your ak' sk = 'your sk' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' sql = '${CUSTOM_SQL_STATEMEMT}' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region,endpoint=endpoint) try: job = client.execute(task=SQLTask(name='sql task', query=sql, conf={}), is_sync=True) if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record])) except QuerySdkError as e: print( "Error in executing sql task. code = %s, error = %s" % ( e.error_code, e.info)) if __name__ == "__main__": execute_sql_task()
SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业开发指南 文档。 JarTask 是 SDK 提供 SparkJar 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
jar | str | Y | 任务执行时使用的 SparkJar 资源,需传入tos路径,例如['tos://bucket/path/to/jar'] |
main_class | str | Y | Spark application 的 main class |
main_args | list | N | spark application 的 main function 参数;不传默认为 empty list |
name | str | N | 任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成 |
conf | dict | N | 用于指定任务参数;默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
depend_jars | []str | N | 依赖的jar文件,对应spark-submit的--jars选项,例如['tos://bucket/path/to/jar'] |
files | []str | N | 依赖的文件,对应spark-submit的--files选项, ,例如['tos://bucket/path/to/file'] |
archives | []str | N | 依赖的archive文件,对应spark-submit的--archieves选项, ,例如['tos://bucket/path/to/archive'] |
示例:
def execute_spark_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import JarTask ak = 'your ak' sk = 'your sk' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) jar_resource = 'tos://bucket/path/to/jar' main_class = 'com.xxx.xxx' main_args = ['arg_xxx', 'arg_yyy'] job = client.execute(task=JarTask(name="spark task test", jar=jar_resource, main_class=main_class, main_args=main_args, conf={'serverless.spark.access.key': '${ak}', 'serverless.spark.secret.key': '${sk}'} ), is_sync=True) print('The task executed successfully.') print('Tracking ui: %s' % job.get_tracking_url()) if __name__ == "__main__": execute_spark_task()
Pyspark 任务为用户提供了通过编写 Python Spark 应用进行定制化数据分析需求的支持。详见 Py Spark作业开发指南。 PySparkTask 是 SDK 提供 PySpark 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
script | str | Y | 任务执行时使用的 SparkJar 资源,需传入tos路径,例如['tos://bucket/path/to/pyfile'] |
args | list | N | spark application 的 main function 参数;不传默认为 empty list |
name | str | N | 任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成 |
conf | dict | N | 用于指定任务参数;默认为空 |
queue | str | N | 指定运行队列名,不填则将选用公共队列 |
depend_jars | []str | N | 依赖的jar文件,对应spark-submit的--jars选项,例如['tos://bucket/path/to/jar'] |
files | []str | N | 依赖的文件,对应spark-submit的--files选项, ,例如['tos://bucket/path/to/file'] |
archives | []str | N | 依赖的archive文件,对应spark-submit的--archieves选项, ,例如['tos://bucket/path/to/archive'] |
pyfiles | []str | N | 依赖的pyfile文件,对应spark-submit的--pyfiles选项,,例如['tos://bucket/path/to/pyfile'] |
示例:
def execute_pyspark_task(): from serverless.client import ServerlessClient from serverless.auth import StaticCredentials from serverless.task import PySparkTask ak = 'your ak' sk = 'your sk' region = 'cn-beijing' service = 'emr_serverless' endpoint = 'open.volcengineapi.com' client = ServerlessClient(credentials=StaticCredentials(ak, sk), service=service, region=region, endpoint=endpoint) pyscript_resource = 'tos://bucket/path/to/pyfile' main_args = ['arg_xxx', 'arg_yyy'] files = ['tos://bucket/path/to/dependency_pyfile'] task = PySparkTask(name='pyspark task test', # queue='${queue_name}', script=pyscript_resource, args=main_args, conf={'serverless.spark.access.key': '${tos_ak}', 'serverless.spark.secret.key': '${tos_sk}'}, files=files ) job = client.execute(task=task, is_sync=True) print('The task executed successfully.') print('Tracking ui: %s' % job.get_tracking_url()) if __name__ == "__main__": execute_pyspark_task()
通过is_sync 参数进行控制:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
task | Task | Y | 需要执行的任务 |
is_sync | bool | Y | 是否同步执行 |
timeout | int | N | 同步执行的超时时间,单位 s |
# 异步 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False) # 同步执行,3 分钟超时 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)
# 取消任务可以通过 job 实例;也可以通过 ServerlessClient 进行取消 job.cancel() client.cancel_job(job)
可以根据任务 ID 进行任务实例的获取:
job = client.get_job(jobId)
从拿到的任务实例获取任务对应的 Spark UI 页面:
job.get_tracking_url()
异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:
# 等待任务结束 job.wait_for_finished() # 自定义结束状态 def when_to_exit() -> bool: return job.get_tracking_url() is not None job.wait_for(when_to_exit=_when_to_exit, timeout=180)
对job实例调用 get_result()
获取任务的查询结果:
result = job.get_result() # 或者由 serverless client 侧获取 result = client.get_result(job) for record in result: print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))
任务异常将会以 QuerySdkError 的形式进行抛出,exception message 内携带具体的执行错误信息。