以下提供了流式计算 Flink版的 SDK 使用说明,如需查看全部的 API 详细信息,请查看 API文档。您可以通过 API Explorer 快速发起 API 调用,获取响应结果和代码示例。
https://pypi.org/project/volcengine-python-sdk/1.1.3/
https://github.com/volcengine/volcengine-python-sdk/tree/master/volcenginesdkflink20250101
Python 版本不低于2.7。
说明
由于 Windows 系统有最长路径限制,可能会导致安装失败,请按照以下方式设置:
步骤一: 集成服务仓库
Install via pip:
pip install volcengine-python-sdk
Install via Setuptools:
python setup.py install --user
或者 sudo python setup.py install 为所有用户安装依赖。
步骤二: 启动时初始化,配置 Configuration 全局默认参数
configuration = volcenginesdkcore.Configuration() configuration.client_side_validation = True # 客户端是否进行参数校验 configuration.schema = "http" # https or http configuration.debug = False # 是否开启调试 configuration.logger_file = "sdk.log" volcenginesdkcore.Configuration.set_default(configuration)
步骤三: 获取 Client
def get_client(ak, sk, region): # 包含默认属性 configuration = volcenginesdkcore.Configuration() configuration.ak = ak # ak泄露存在风险,建议从配置文件中获取。 configuration.sk = sk # sk泄露存在风险,建议从配置文件中获取。 configuration.region = region client = volcenginesdkautoscaling.AUTOSCALINGApi(volcenginesdkcore.ApiClient(configuration)) return client
如果您要自定义SDK的Endpoint,可以按照以下示例代码设置:
configuration = volcenginesdkcore.Configuration() configuration.host = 'ecs.cn-beijing-autodriving.volcengineapi.com'
火山引擎标准的Endpoint规则说明:
Regional 服务 | Global 服务 |
|---|---|
|
|
注:Service中存在_符号时,Endpoint时需转为-符号。存在大写字母时需转成小写。
以下提供调用【GetApplicationInstance】API的SDK示例代码,实际项目中您可以替换成SDK中已支持的任意API。
# Example Code generated by Beijing Volcanoengine Technology. from __future__ import print_function import volcenginesdkcore import volcenginesdkflink20250101 from pprint import pprint from volcenginesdkcore.rest import ApiException if __name__ == '__main__': configuration = volcenginesdkcore.Configuration() configuration.ak = "Your AK" configuration.sk = "Your SK" configuration.region = "cn-beijing" # set default configuration volcenginesdkcore.Configuration.set_default(configuration) # use global default configuration api_instance = volcenginesdkflink20250101.FLINK20250101Api() get_application_instance_request = volcenginesdkflink20250101.GetApplicationInstanceRequest( ) try: resp = api_instance.get_application_instance(get_application_instance_request) pprint(resp) except ApiException as e: print("Exception when calling api: %s\n" % e)