You need to enable JavaScript to run this app.
导航

通过SDK使用工作流

最近更新时间2023.07.07 15:22:25

首次发布时间2023.07.07 15:22:25

安装

需要提前安装好 python3.6 及以上版本

wget https://ml-platform-public-examples-cn-beijing.tos-cn-beijing.volces.com/python_sdk_installer/volcengine_ml_platform-1.1.0b2-py3-none-any.whl

pip3 install volcengine_ml_platform-1.1.0b2-py3-none-any.whl
配置 AK/SK

在正式使用 SDK 之前需要先完成火山引擎账号的 AK / SK 的本地配置,用以在使用 SDK 访问机器学习平台时的身份校验。

  1. 登录火山引擎控制台并前往【密钥管理】查看当前账号的 AK / SK。
    1. 若当前账号为子账号,需要具备 AccessKeyFullAccess 的 IAM 策略。

请使用真实的 AK/SK 替换下列方法中的 <your access key> 和 <your secret access key>

方法一(通过配置文件配置):

mkdir -p $HOME/.volc

cat <<EOF > $HOME/.volc/credentials
[default]
access_key_id     = <your access key>
secret_access_key = <your secret access key>
EOF

cat <<EOF > $HOME/.volc/config
[default]
region       = cn-beijing  # 填写所在地域,目前仅支持 cn-beijing
EOF

方法二(通过代码配置):

import volcengine_ml_platform as vemlp
vemlp.init(
    ak='<your access key>',
    sk='<your secret access key>',
    region='cn-beijing',
)

方法三(通过环境变量配置):

export VOLC_ACCESSKEY='<your access key>'
export VOLC_SECRETKEY='<your secret access key>'
export VOLC_REGION=cn-beijing

SDK Usage

Pipeline

class volcengine_ml_platform.pipeline.Pipeline(
    *, 
    id: Optional[str] = None, 
    name: Optional[str] = None, 
    version_id: Optional[str] = None, 
    version_number: Optional[int] = None, 
    description: Optional[str] = None, 
    pipeline_template: Optional[PipelineTemplate] = None, 
    user_id: Optional[str] = None, 
    account_id: Optional[str] = None, 
    create_time: Optional[str] = None, 
    update_time: Optional[str] = None, 
    extra: Optional[Dict[str, str]] = None,
)

Functions

classmethod get(id: Optional[str] = None, name: Optional[str] = None) -> Pipeline

通过 id 或 name 获取 pipeline 的最新配置

  • Parameters

    • id - Pipeline id

    • name - Pipeline name

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> p = Pipeline.get(id='pipeline_id')
    >>> print(p)
    >>> p = Pipeline.get(name='pipeline_name')
    >>> print(p)
    

classmethod load(filename: str, *, type: str = 'yaml', name: Optional[str] = None, description: Optional[str] = None) -> Pipeline

通过配置文件加载 Pipeline

  • Parameters

    • filename - 要加载的文件路径

    • type - 要加载的文件类型,目前只支持 yaml

    • name - Pipeline 名称

    • description - Pipeline 描述

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline.load(...)
    >>> Pipeline.load(..., name='pipeline_name', description='pipeline_description')
    

create()

创建 Pipeline

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline(name='pipeline_name', description='pipeline_description').create()
    >>> Pipeline.load(...).create()
    

exists() -> bool

判断 Pipeline 是否已存在

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline(id='pipeline_id').exists()
    >>> Pipeline(name='pipeline_name').exists()
    >>> Pipeline(id='pipeline_id', name='pipeline_name').exists()
    >>> Pipeline.load(...).exists()
    

update(upload_code_progress_bar: bool = False, upload_code_timeout: int = 3600, upload_code_copy_links: bool = False, upload_code_retain_links: bool = False)

更新 Pipeline 配置

  • Parameters

    • upload_code_progress_bar - 上传代码是否显示进度条,默认不显示

    • upload_code_timeout - 上传代码超时时间,默认 1h(3600s)

    • upload_code_copy_links - 当遇到链接文件时,将链接执行的文件副本上传(如链接失效则会上传失败)

    • upload_code_retain_links - 当遇到链接文件时,将链接文件原封不动上传(如链接指向文件在镜像环境不存在,则可能在运行时报错)

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline(id='pipeline_id').update()
    >>> Pipeline(name='pipeline_name').update()
    >>> Pipeline.load(...).update()
    

launch(launch_name: Optional[str] = None, inputs: Optional[Union[Dict[str, str], List[InputInstance]]] = None, fail_fast: bool = False) -> str

运行 Pipeline,如果 Pipeline.pipeline_template is None 运行云端最新配置,反之运行本地配置

  • Parameters

    • launch_name - 运行实例名称,如不传,则自动按照 {Pipeline.name}_{random 5 chars} 格式自动生成

    • inputs - Pipeline 定义所需要的输入参数

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline.get(id='pipeline_id').launch()
    >>> Pipeline.get(name='pipeline_name').launch()
    >>> Pipeline.load(...).launch()
    

update_and_launch(launch_name: Optional[str] = None, inputs: Optional[Union[Dict[str, str], List[InputInstance]]] = None, fail_fast: bool = False, upload_code_progress_bar: bool = False, upload_code_timeout: int = 3600, upload_code_copy_links: bool = False, upload_code_retain_links: bool = False)→ str

更新并运行 Pipeline

  • Parameters

    • launch_name - 运行实例名称,如不传,则自动按照 {Pipeline.name}_{random 5 chars} 格式自动生成

    • inputs - Pipeline 定义所需要的输入参数

    • upload_code_progress_bar - 上传代码是否显示进度条,默认不显示

    • upload_code_timeout - 上传代码超时时间,默认 1h(3600s)

    • upload_code_copy_links - 当遇到链接文件时,将链接执行的文件副本上传(如链接失效则会上传失败)

    • upload_code_retain_links - 当遇到链接文件时,将链接文件原封不动上传(如链接指向文件在镜像环境不存在,则可能在运行时报错)

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline.get(id='pipeline_id').update_and_launch()
    >>> Pipeline.get(name='pipeline_name').update_and_launch()
    >>> Pipeline.load(...).update_and_launch()
    

show(axis: str = 'y')

打印 Pipeline DAG 图

  • Parameters

    • axis - 打印坐标轴,x 横向打印,y 纵向打印
  • Example

    >>> Pipeline.get(id='pipeline_id').show()
    """
    + - - - - - - - - - - - - - - - - -+
    '  linear_pipeline_with_two_tasks  '
    '                                  '
    ' +------------------------------+ '
    ' |            task_a            | '
    ' +------------------------------+ '
    '   |                              '
    '   |                              '
    '   v                              '
    ' +------------------------------+ '
    ' |            task_b            | '
    ' +------------------------------+ '
    '                                  '
    + - - - - - - - - - - - - - - - - -+
    """
    >>> Pipeline.get(id='pipeline_id').show(axis='x')
    """
    + - - - - - - - - - - - - - - - - - - - - - - - - - - -+
    '            linear_pipeline_with_two_tasks            '
    '                                                      '
    ' +--------+                                +--------+ '
    ' | task_a | -----------------------------> | task_b | '
    ' +--------+                                +--------+ '
    '                                                      '
    + - - - - - - - - - - - - - - - - - - - - - - - - - - -+
    """
    

dump(output_path: str = '.', *, type: str = 'yaml')

将 Pipeline 导出到文件

  • Parameters

    • output_path - 导出文件路径,如果路径为目录,会自动生成 {pipeline.name}_{yyyyMMdd_hhmmss}.{type} 的文件

    • type - 导出文件格式,目前仅支持 yaml

  • Example

    >>> from volcengine_ml_platform.pipeline import Pipeline
    >>> Pipeline.get(id='pipeline_id').dump()
    >>> Pipeline.get(name='pipeline_name').dump()
    >>> Pipeline.load(...).dump()
    

PipelineInstance

class volcengine_ml_platform.pipeline.PipelineInstance(
    *, 
    id: Optional[str], 
    name: Optional[str], 
    pipeline_id: Optional[str], 
    pipeline_version_id: Optional[str], 
    pipeline_template: Optional[PipelineTemplate] = None, 
    global_inputs: Optional[List[InputInstance]] = None, 
    global_outputs: Optional[List[OutputInstance]] = None, 
    state: Optional[str] = None, 
    launch_time: Optional[str] = None, 
    finish_time: Optional[str] = None, 
    diag_info: Optional[str] = None, 
    trigger: Optional[str] = None, 
    user_id: Optional[str] = None, 
    account_id: Optional[str] = None, 
    create_time: Optional[str] = None, 
    update_time: Optional[str] = None, 
    extra: Optional[Dict[str, str]] = None,
)

Functions

classmethod get(id: str) -> PipelineInstance

通过 id 获取 pipeline 实例的信息

  • Parameters

    • id - Pipeline 实例 id
  • Example

    >>> from volcengine_ml_platform.pipeline import PipelineInstance
    >>> PipelineInstance.get('pipeline_instance_id')
    

dump(output_path: str = '.', *, type: str = 'yaml')

通过 id 获取 pipeline 实例的信息

  • Parameters

    • output_path - 导出文件路径,如果路径为目录,会自动生成 {pipeline.name}_{yyyyMMdd_hhmmss}.{type} 的文件

    • type - 导出文件格式,目前仅支持 yaml

  • Example

    >>> from volcengine_ml_platform.pipeline import PipelineInstance
    >>> PipelineInstance.get('pipeline_instance_id').dump()
    >>> PipelineInstance.get('pipeline_instance_id').dump('pipeline.yaml')
    

show(axis: str = 'y')

通过 id 获取 pipeline 实例的信息

  • Parameters

    • axis - 打印坐标轴,x 横向打印,y 纵向打印
  • Example

    >>> from volcengine_ml_platform.pipeline import PipelineInstance
    >>> PipelineInstance.get('pipeline_instance_id').show()
    + - - - - - - - - - - - - - - - - - - - -+
    '  linear_pipeline_with_two_tasks_sasnf  '
    '                                        '
    ' +------------------------------------+ '
    ' |          task_a(Finished)          | '
    ' +------------------------------------+ '
    '   |                                    '
    '   |                                    '
    '   v                                    '
    ' +------------------------------------+ '
    ' |          task_b(Running)           | '
    ' +------------------------------------+ '
    '                                        '
    + - - - - - - - - - - - - - - - - - - - -+
    

示例代码

请使用真实的队列名称替换下文中的 <Your Resource Queue Name>

通过 Yaml 运行工作流

创建并运行一个简单的工作流

import os
import sys

from volcengine_ml_platform.pipeline import Pipeline

yaml_file_path = "<your yaml file path>"
pipeline_name = "<your pipeline name>"

def main():
    pipeline = Pipeline.load(yaml_file_path, name=pipeline_name)
    
    # Create a pipeline if not exists
    # If load after create, the pipeline is empty
    if not pipeline.exists():
        pipeline.create()
        
    pipeline_instance_id = pipeline.launch()
    print(f"Pipeline Id: {pipeline.id}")
    print(f"Pipeline Instance Id: {pipeline_instance_id}")
    
if __name__ == "__main__":
    main()

创建一个简单的工作流,同时上传本地代码

#!/usr/bin/env bash
set -x
pwd
ls
env

配置工作流输入参数,支持在子任务中引用工作流的输入

import os
import sys

from volcengine_ml_platform.pipeline import Pipeline

yaml_file_path = "<your yaml file path>"
pipeline_name = "<your pipeline name>"

def main():
    pipeline = Pipeline.load(yaml_file_path, name=pipeline_name)
    
    # Create a pipeline if not exists
    # If load after create, the pipeline is empty
    if not pipeline.exists():
        pipeline.create()
        
    inputs = {"command": "echo 'hello pipeline'"}
        
    pipeline_instance_id = pipeline.launch(inputs=inputs)
    print(f"Pipeline Id: {pipeline.id}")
    print(f"Pipeline Instance Id: {pipeline_instance_id}")
    
if __name__ == "__main__":
    main()

导出 Yaml

导出一个工作流的最新配置

import os
import sys

from volcengine_ml_platform.pipeline import Pipeline

pipeline_name = "<your pipeline name>"
export_file_name = "<your export file name>"

def main():
    pipeline = Pipeline.get(name=pipeline_name)
    if export_file_name:
        os.makedirs(os.path.dirname(export_file_name), exist_ok=True)
        pipeline.dump(export_file_name, type='yaml')
    else:
        pipeline.dump(type='yaml')

if __name__ == "__main__":
    main()

导出一个工作流实例的配置

import os
import sys

from volcengine_ml_platform.pipeline import PipelineInstance

pipeline_instance_id = "<your pipeline instance id>"
export_file_name = "<your export file name>"
    
def main():
    pipeline_instance = PipelineInstance.get(pipeline_instance_id)

    if export_file_name:
        os.makedirs(os.path.dirname(export_file_name), exist_ok=True)
        pipeline_instance.dump(export_file_name, type='yaml')
    else:
        pipeline_instance.dump(type='yaml')

if __name__ == "__main__":
    main()