You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
SDK参考
Python
复制全文
Python

描述

以下提供了流式计算 Flink版的 SDK 使用说明,如需查看全部的 API 详细信息,请查看 API文档。您可以通过 API Explorer 快速发起 API 调用,获取响应结果和代码示例。

下载地址

发布地址

https://pypi.org/project/volcengine-python-sdk/1.1.3/

源码地址

https://github.com/volcengine/volcengine-python-sdk/tree/master/volcenginesdkflink20250101

安装说明

前置条件

环境要求

Python 版本不低于2.7。
说明
由于 Windows 系统有最长路径限制,可能会导致安装失败,请按照以下方式设置:

  1. 按下 Win+R ,输入 regedit 打开注册表编辑器。
  2. 设置 \HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\FileSystem 路径下的变量 LongPathsEnabled 为 1 即可。

安装步骤

步骤一: 集成服务仓库
Install via pip:

pip install volcengine-python-sdk

Install via Setuptools:

python setup.py install --user

或者 sudo python setup.py install 为所有用户安装依赖。

步骤二: 启动时初始化,配置 Configuration 全局默认参数

configuration = volcenginesdkcore.Configuration()
configuration.client_side_validation = True  # 客户端是否进行参数校验
configuration.schema = "http"  # https or http
configuration.debug = False  # 是否开启调试
configuration.logger_file = "sdk.log"

volcenginesdkcore.Configuration.set_default(configuration)

步骤三: 获取 Client

def get_client(ak, sk, region):
    # 包含默认属性
    configuration = volcenginesdkcore.Configuration()
    configuration.ak = ak # ak泄露存在风险,建议从配置文件中获取。
    configuration.sk = sk # sk泄露存在风险,建议从配置文件中获取。
    configuration.region = region
    client = volcenginesdkautoscaling.AUTOSCALINGApi(volcenginesdkcore.ApiClient(configuration))
    return client

使用说明

Endpoint 设置

如果您要自定义SDK的Endpoint,可以按照以下示例代码设置:

configuration = volcenginesdkcore.Configuration()
configuration.host = 'ecs.cn-beijing-autodriving.volcengineapi.com'

火山引擎标准的Endpoint规则说明:

Regional 服务

Global 服务

{service}.{region}.volcengineapi.com
例如:云服务ecs在cn-beijing-autodriving Region域名为: ecs.cn-beijing-autodriving.volcengineapi.com

{service}.volcengineapi.com
例如:访问控制iam为Global服务,域名为:iam.volcengineapi.com

注:Service中存在_符号时,Endpoint时需转为-符号。存在大写字母时需转成小写。

SDK 示例

以下提供调用【GetApplicationInstance】API的SDK示例代码,实际项目中您可以替换成SDK中已支持的任意API。

# Example Code generated by Beijing Volcanoengine Technology.
from __future__ import print_function
import volcenginesdkcore
import volcenginesdkflink20250101
from pprint import pprint
from volcenginesdkcore.rest import ApiException

if __name__ == '__main__':
    configuration = volcenginesdkcore.Configuration()
    configuration.ak = "Your AK"
    configuration.sk = "Your SK"
    configuration.region = "cn-beijing"
    # set default configuration
    volcenginesdkcore.Configuration.set_default(configuration)

    # use global default configuration
    api_instance = volcenginesdkflink20250101.FLINK20250101Api()
    get_application_instance_request = volcenginesdkflink20250101.GetApplicationInstanceRequest(
    )

    try:
        resp = api_instance.get_application_instance(get_application_instance_request)
        pprint(resp)
    except ApiException as e:
        print("Exception when calling api: %s\n" % e)
最近更新时间:2025.12.29 11:42:32
这个页面对您有帮助吗?
有用
有用
无用
无用