最近更新时间:2023.09.01 17:49:10
首次发布时间:2022.05.05 17:36:29
LAS Query Python SDK 帮助 LAS 用户更加轻松地通过 Python 语言使用 LAS 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。
系统概念
Endpoint:表示 LAS 对外服务的 API 域名
Region:表示 LAS 的数据中心所在的物理区域
目前 LAS 支持的地域和 API 域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | las.volcengineapi.com |
内部概念
Schema:一个可以包含 数据表、资源、UDF 等的集合空间概念
Resource:表示资源,目前分为 Jar、File、ZIP、PyFile 四种类型
Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息
Job:表示某次 Task 执行生成的任务实例
Result:表示某次 Job 的运行结果
ResultSchema:运行结果的 Schema 信息
Record:表示运行结果的结果集中的一行记录
要求:
直接使用 wheel 安装:
$ unzip python_las-1.0.0.1-py3-none-any.whl.zip $ pip3 install python_las-1.0.0.1-py3-none-any.whl
LAS SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
from las.client import LASClient from las.auth import StaticCredentials ak = "your ak" sk = "your sk" region = "cn-beijing" endpoint = "las.volcengineapi.com" connection_timeout = 30 socket_timeout = 30 client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint, connection_timeout=connection_timeout, socket_timeout=socket_timeout)
LASClient
客户端是后续调用各种 LAS 功能的入口,当前版本 LASClient 提供如下 API 接口:
初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:
sql = """ SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100 """ # 同步执行查询 job = client.execute(task=SQLTask(name="first query task", query=sql, conf={}), is_sync=True) # 获取查询结果 if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record]))
本节将以代码示例的形式展示更多 LAS 功能的使用方式。
SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下三个参数:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
query | str | Y | sql 语句 |
name | str | N | 任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成 |
conf | dict | N | 用于指定任务参数;默认为空 |
示例:
def execute_sql_task(): from las.client import LASClient from las.auth import StaticCredentials from las.task import SQLTask ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'las.volcengineapi.com' sql = 'CUSTOM SQL STATEMEMT' client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) try: job = client.execute(task=SQLTask(name='sql task', query=sql, conf={}), is_sync=True) if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record])) except LASError as e: print( "Error in executing sql task. code = %s, error = %s" % ( e.code, e.info)) if __name__ == "__main__": execute_sql_task()
支持的 SQL 语法请参照 LAS SQL语法 。
SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业 文档。
SparkTask 是 SDK 提供 SparkJar 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
jar | JarResourceInfo | Y | 任务执行时使用的 SparkJar 资源 |
main_class | str | Y | Spark application 的 main class |
main_args | list | N | spark application 的 main function 参数;不传默认为 empty list |
name | str | N | 任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成 |
conf | dict | N | 用于指定任务参数;默认为空 |
示例:
def execute_spark_task(): from las.client import LASClient from las.auth import StaticCredentials from las.resource import JarResourceInfo from las.task import SparkJarTask ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'las.volcengineapi.com' client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) schema = 'schema_xxx' resource_name = 'jar_resource_xxx' main_class = 'com.xxx.xxx' job = client.execute(task=SparkJarTask(name="spark task", jar=JarResourceInfo.of( schema, resource_name), main_class=main_class, main_args=['arg_xxx', 'arg_yyy'] ), is_sync=False) def when_to_exit() -> bool: return job.get_tracking_url() is not None job.wait_for(when_to_exit=when_to_exit, timeout=180) print('Tracking Url: %s' % job.get_tracking_url()) job.wait_for_finished() print('The task executed successfully.') if __name__ == "__main__": execute_spark_task()
通过is_sync 参数进行控制:
参数 | 类型 | 是否必须 | 描述 |
---|---|---|---|
task | Task | Y | 需要执行的任务 |
is_sync | bool | Y | 是否同步执行 |
timeout | int | N | 同步执行的超时时间,单位 s |
# 异步 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False) # 同步执行,3 分钟超时 client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)
# 取消任务可以通过 job 实例;也可以通过 LasClient 进行取消 job.cancel() client.cancel(job)
可以根据任务 ID 进行任务实例的获取:
job = client.get_job(jobId)
从拿到的任务实例获取任务对应的 Spark UI / presto UI 页面:
job.get_tracking_url()
异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:
# 等待任务结束 job.wait_for_finished() # 自定义结束状态 def when_to_exit() -> bool: return job.get_tracking_url() is not None job.wait_for(when_to_exit=_when_to_exit, timeout=180)
对job实例调用 get_result()
获取任务的查询结果:
result = job.get_result() # 或者由 las client 侧获取 result = client.get_result(job) for record in result: print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))
上传资源通过 XXXResourceInfo.of()
进行指定:
def upload_and_list_resource(): from las.client import LASClient from las.auth import StaticCredentials from las.exceptions import LASError from las.resource import FileResourceInfo ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'las.volcengineapi.com' client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) file_path = '/path/to/file' schema = 'schema_xxx' resource_name = 'resource_name_xxx' description = '测试文件 xxx' try: client.upload_resource( FileResourceInfo(schema, resource_name, description), file_path) client.list_resources(schema) except LASError as e: print( "Error in uploading or listing resource. code = %s, error = %s" % ( e.code, e.info)) if __name__ == "__main__": upload_and_list_resource()
目前 LAS 支持四种资源类型:
File -> 调用 FileResourceInfo.of()
Zip -> 调用 ZipResourceInfo.of()
PyFlie -> 调用 PyFileResourceInfo.of()
Jar -> 调用 JarResourceInfo.of()
UDF 的创建目前 SDK 侧只支持通过 SQLTask 进行执行。详见语法文档( LAS SQL 语法 )的 Create Function 部分。
def create_udf(): from las.auth import StaticCredentials from las.client import LASClient from las.task import SQLTask from las.exceptions import LASError ak = 'your ak' sk = 'your sk' region = 'cn-beijing' endpoint = 'las.volcengineapi.com' client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint) try: create_udf_job = client.execute(task=SQLTask(name='create udf', query="CREATE FUNCTION minreng_test_db_2.sortstring2 AS 'com.bytedance.hive.udf.dp.SortString' using jar 'testywudf.new'", conf={}), is_sync=True) if create_udf_job.is_success(): job = client.execute(task=SQLTask(name='apply udf', query="select minreng_test_db_2.sortstring2(name) from testywudf.test1 where date='20210928' limit 10", conf={}), is_sync=True) if job.is_success(): result = job.get_result() for record in result: print(', '.join([col for col in record])) except LASError as e: print( "Error in creating or apply udf. code = %s, error = %s" % ( e.code, e.info)) if __name__ == "__main__": create_udf()
任务异常将会以 LAS Error 的形式进行抛出,exception message 内携带具体的执行错误信息。