You need to enable JavaScript to run this app.
导航

代码示例

最近更新时间2024.03.22 14:28:51

首次发布时间2023.07.18 16:13:48

本文介绍如何使用数据库传输服务 DTS Python SDK 快速调用 API 执行创建数据传输任务和查询任务信息。

前提条件

  1. 下载安装 SDK

  2. 初始化客户端

创建数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    task_name = 'task_name_str'
    task_type = 'task_type_here'
    region_str = 'region_str_here'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
				"TaskName":"zaizaitest****1",
		    "TaskType":"DataMigration",
		    "SrcConfig": {
                "EndpointType": "Public_MySQL",
                "PublicMySQLSettings": {
                    "Host": "192.168.***.***",
                    "Port": 4,
                    "Username": "test****",
                    "Password": "Admin@****",
                    "SSLSettings": {
                        "EnableSSL": True,
                        "Cert": "-----BEGIN CERTIFICATE----- ...... -----END CERTIFICATE-----"
                    },
                    "RegionSettings": {
                        "Region": "cn-shanghai"
                    }
                }
            },
		     "DestConfig": {
                "EndpointType": "Volc_MySQL",
                "VolcMySQLSettings": {
                    "DBInstanceId": "mysql-7cd1dc4ab511",
                    "DBInstanceName": "",
                    "Password": "Admin@****",
                    "RegionSettings": {
                        "Region": "cn-guangzhou"
                    },
                    "Username": "test****"
                }
            },
		    "TrafficSpec":"Compact",
		    "ProjectName":"default",
		    "SolutionSettings": {
                "MySQL2MySQLSettings": {
                    "AccountTransmissionSettings": {
                        "EnableAccount": True
                    },
                    "ETLSettings": {
                        "Script": ""
                    },
                    "ErrorBehaviorSettings": {
                        "MaxRetrySeconds": 7200
                    },
                    "FullTransmissionSettings": {
                        "EnableFull": True,
                        "Snapshot": False
                    },
                    "IncrTransmissionSettings": {
                        "EnableIncr": True,
                        "Statements": [
                            "StmtDDLAll",
                            "StmtDMLDelete",
                            "StmtDMLUpdate",
                            "StmtDMLInsert"
                        ]
                    },
                    "MetaTransmissionSettings": {
                        "EnableMeta": True
                    },
                    "ObjectMappings": [
                        {
                            "DestObjName": "huahua",
                            "MappingList": [
                                {
                                    "DestObjName": "*",
                                    "ObjectType": "Table",
                                    "SrcObjName": "*"
                                }
                            ],
                            "ObjectType": "Database",
                            "SrcObjName": "huahua"
                        }
                    ],
                    "PolicyForPrimaryKeyConflict": "Override"
                },
                "SolutionType": "MySQL2MySQL"
            },
		     "Tags": [
                {
                    "Key": "key1",
                    "Value": "value1"
                }
            ],
		    "ChargeConfig":{
        "ChargeType":"PostPaid"
	    }
    }
    resp = dts_service.create_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))
    

查询数据传输任务信息

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.describe_transmission_task_info(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

暂停数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.suspend_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

启动数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.start_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

重试数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.retry_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

恢复数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.resume_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

终止数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.stop_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

删除数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.delete_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

创建预检查任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.pre_check_async(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

查询预检查结果

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    id_str = 'id_str'  # we get id_str from precheck_async api

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'ID': id_str
    }

    resp = dts_service.get_async_pre_check_result(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

常见问题

在使用 Python SDK 发送请求时遇到 Can't connect to HTTPS URL because the SSL module is not available. 报错怎么办?

可能原因:在安装 Python SDK 时,没有正确安装 SSL 模块。

解决方法:重新安装 Python,并确保在安装过程中包含了 SSL 模块。