You need to enable JavaScript to run this app.
导航

Query Python SDK

最近更新时间2023.09.01 17:49:10

首次发布时间2022.05.05 17:36:29

1. 简介

LAS Query Python SDK 帮助 LAS 用户更加轻松地通过 Python 语言使用 LAS 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便参考使用。

2. 概念说明

系统概念

  • Endpoint:表示 LAS 对外服务的 API 域名

  • Region:表示 LAS 的数据中心所在的物理区域

目前 LAS 支持的地域和 API 域名如下表所示:

Region(中文名称)RegionEndpoint
华北-北京cn-beijinglas.volcengineapi.com
  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的“密钥管理”页面获取到 Access Key 和 Secret Access Key。

内部概念

  • Schema:一个可以包含 数据表、资源、UDF 等的集合空间概念

  • Resource:表示资源,目前分为 Jar、File、ZIP、PyFile 四种类型

  • Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息

  • Job:表示某次 Task 执行生成的任务实例

  • Result:表示某次 Job 的运行结果

  • ResultSchema:运行结果的 Schema 信息

  • Record:表示运行结果的结果集中的一行记录

3. 安装 SDK

要求:

  • Python 3.6+

直接使用 wheel 安装:

python_las-1.0.0.1-py3-none-any.whl.zip
31.30KB
$ unzip python_las-1.0.0.1-py3-none-any.whl.zip
$ pip3 install python_las-1.0.0.1-py3-none-any.whl

4. 快速入门

4.1 初始化客户端

LAS SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

from las.client import LASClient
from las.auth import StaticCredentials

ak = "your ak"
sk = "your sk"
region = "cn-beijing"
endpoint = "las.volcengineapi.com"
connection_timeout = 30
socket_timeout = 30

client = LASClient(credentials=StaticCredentials(ak, sk), region=region, endpoint=endpoint, connection_timeout=connection_timeout, socket_timeout=socket_timeout)

LASClient 客户端是后续调用各种 LAS 功能的入口,当前版本 LASClient 提供如下 API 接口:

4.2 第一个查询

初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:

sql = """
    SELECT * FROM `${your_schema}`.`${your_table}` LIMIT 100
"""

# 同步执行查询
job = client.execute(task=SQLTask(name="first query task", 
                            query=sql,
                            conf={}),
                    is_sync=True)

# 获取查询结果
if job.is_success():
    result = job.get_result()
    for record in result:
        print(', '.join([col for col in record]))
5. 更多示例

本节将以代码示例的形式展示更多 LAS 功能的使用方式。

5.1 提交/取消任务

5.1.1 提交 SQL 任务

SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下三个参数:

参数类型是否必须描述
querystrYsql 语句
namestrN任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成
confdictN用于指定任务参数;默认为空

示例:

def execute_sql_task():
		from las.client	import LASClient
		from las.auth	import StaticCredentials
    from las.task import SQLTask

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    endpoint = 'las.volcengineapi.com'
    sql = 'CUSTOM SQL STATEMEMT'

    client = LASClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)

    try:
        job = client.execute(task=SQLTask(name='sql task', query=sql, conf={}), is_sync=True)
        if job.is_success():
            result = job.get_result()
            for record in result:
                print(', '.join([col for col in record]))

    except LASError as e:
        print(
            "Error in executing sql task. code = %s, error = %s" % (
            e.code, e.info))


if __name__ == "__main__":
    execute_sql_task()

支持的 SQL 语法请参照 LAS SQL语法

5.1.2 提交 SparkJar 任务

SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业 文档。
SparkTask 是 SDK 提供 SparkJar 任务执行的接口:

参数类型是否必须描述
jarJarResourceInfoY任务执行时使用的 SparkJar 资源
main_classstrYSpark application 的 main class
main_argslistNspark application 的 main function 参数;不传默认为 empty list
namestrN任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成
confdictN用于指定任务参数;默认为空

示例:

def execute_spark_task():
		from las.client	import LASClient
		from las.auth	import StaticCredentials
    from las.resource import JarResourceInfo
    from las.task import SparkJarTask

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    endpoint = 'las.volcengineapi.com'

    client = LASClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)

    schema = 'schema_xxx'
    resource_name = 'jar_resource_xxx'
    main_class = 'com.xxx.xxx'
    job = client.execute(task=SparkJarTask(name="spark task",
                                           jar=JarResourceInfo.of(
                                               schema, resource_name),
                                           main_class=main_class,
                                           main_args=['arg_xxx', 'arg_yyy']
                                           ),
                         is_sync=False)

    def when_to_exit() -> bool:
        return job.get_tracking_url() is not None

    job.wait_for(when_to_exit=when_to_exit, timeout=180)
    print('Tracking Url: %s' % job.get_tracking_url())

    job.wait_for_finished()
    print('The task executed successfully.')


if __name__ == "__main__":
    execute_spark_task()

5.1.3 同步/异步执行

通过is_sync 参数进行控制:

参数类型是否必须描述
taskTaskY需要执行的任务
is_syncboolY是否同步执行
timeoutintN同步执行的超时时间,单位 s
# 异步
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=False)

# 同步执行,3 分钟超时
client.execute(task=SQLTask(name='', sql=sql, conf={}), is_sync=True, timeout=180)

5.1.4 取消任务

# 取消任务可以通过 job 实例;也可以通过 LasClient 进行取消
job.cancel()
client.cancel(job)

5.2 查看任务实例相关信息

5.2.1 获取任务实例

可以根据任务 ID 进行任务实例的获取:

job = client.get_job(jobId)

5.2.2 获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI / presto UI 页面:

job.get_tracking_url()

5.2.3 等待任务

异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 wait_for() 函数:

# 等待任务结束
job.wait_for_finished()

# 自定义结束状态
def when_to_exit() -> bool:
    return job.get_tracking_url() is not None

job.wait_for(when_to_exit=_when_to_exit, timeout=180)

5.3 获取查询结果

对job实例调用 get_result() 获取任务的查询结果:

result = job.get_result()

# 或者由 las client 侧获取
result = client.get_result(job)

for record in result:
    print("row: (%s, %s, %s)" % (record[0], record[1], record[2]))

5.4 上传资源/查看资源列表

上传资源通过 XXXResourceInfo.of()进行指定:

def upload_and_list_resource():
		from las.client	import LASClient
		from las.auth	import StaticCredentials
    from las.exceptions import LASError
    from las.resource import FileResourceInfo

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    endpoint = 'las.volcengineapi.com'

    client = LASClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)

    file_path = '/path/to/file'
    schema = 'schema_xxx'
    resource_name = 'resource_name_xxx'
    description = '测试文件 xxx'

    try:
        client.upload_resource(
            FileResourceInfo(schema, resource_name, description), file_path)
        client.list_resources(schema)

    except LASError as e:
        print(
            "Error in uploading or listing resource. code = %s, error = %s" % (
            e.code, e.info))


if __name__ == "__main__":
    upload_and_list_resource()

目前 LAS 支持四种资源类型:

  • File -> 调用 FileResourceInfo.of()

  • Zip -> 调用 ZipResourceInfo.of()

  • PyFlie -> 调用 PyFileResourceInfo.of()

  • Jar -> 调用 JarResourceInfo.of()

5.5 UDF 的创建和使用

UDF 的创建目前 SDK 侧只支持通过 SQLTask 进行执行。详见语法文档( LAS SQL 语法 )的 Create Function 部分。

def create_udf():
    from las.auth import StaticCredentials
    from las.client import LASClient
    from las.task import SQLTask
    from las.exceptions import LASError

    ak = 'your ak'
    sk = 'your sk'
    region = 'cn-beijing'
    endpoint = 'las.volcengineapi.com'

    client = LASClient(credentials=StaticCredentials(ak, sk), region=region,
                       endpoint=endpoint)

    try:
        create_udf_job = client.execute(task=SQLTask(name='create udf',
                                                     query="CREATE FUNCTION minreng_test_db_2.sortstring2 AS 'com.bytedance.hive.udf.dp.SortString' using jar 'testywudf.new'",
                                                     conf={}), is_sync=True)

        if create_udf_job.is_success():
            job = client.execute(task=SQLTask(name='apply udf',
                                        query="select minreng_test_db_2.sortstring2(name) from testywudf.test1 where date='20210928' limit 10",
                                        conf={}), is_sync=True)
            if job.is_success():
                result = job.get_result()
                for record in result:
                    print(', '.join([col for col in record]))

    except LASError as e:
        print(
            "Error in creating or apply udf. code = %s, error = %s" % (
                e.code, e.info))


if __name__ == "__main__":
    create_udf()

5.6 执行异常

任务异常将会以 LAS Error 的形式进行抛出,exception message 内携带具体的执行错误信息。