You need to enable JavaScript to run this app.
数据库传输服务

数据库传输服务

复制全文
Python SDK
代码示例
复制全文
代码示例

本文介绍如何使用数据库传输服务 DTS Python SDK 快速调用 API 执行创建数据传输任务和查询任务信息。

前提条件

  1. 下载安装 SDK

  2. 初始化客户端

创建数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    task_name = 'task_name_str'
    task_type = 'task_type_here'
    region_str = 'region_str_here'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
				"TaskName":"zaizaitest****1",
		    "TaskType":"DataMigration",
		    "SrcConfig": {
                "EndpointType": "Public_MySQL",
                "PublicMySQLSettings": {
                    "Host": "192.168.***.***",
                    "Port": 4,
                    "Username": "test****",
                    "Password": "Admin@****",
                    "SSLSettings": {
                        "EnableSSL": True,
                        "Cert": "-----BEGIN CERTIFICATE----- ...... -----END CERTIFICATE-----"
                    },
                    "RegionSettings": {
                        "Region": "cn-shanghai"
                    }
                }
            },
		     "DestConfig": {
                "EndpointType": "Volc_MySQL",
                "VolcMySQLSettings": {
                    "DBInstanceId": "mysql-7cd1dc4ab511",
                    "DBInstanceName": "",
                    "Password": "Admin@****",
                    "RegionSettings": {
                        "Region": "cn-guangzhou"
                    },
                    "Username": "test****"
                }
            },
		    "TrafficSpec":"Compact",
		    "ProjectName":"default",
		    "SolutionSettings": {
                "MySQL2MySQLSettings": {
                    "AccountTransmissionSettings": {
                        "EnableAccount": True
                    },
                    "ETLSettings": {
                        "Script": ""
                    },
                    "ErrorBehaviorSettings": {
                        "MaxRetrySeconds": 7200
                    },
                    "FullTransmissionSettings": {
                        "EnableFull": True,
                        "Snapshot": False
                    },
                    "IncrTransmissionSettings": {
                        "EnableIncr": True,
                        "Statements": [
                            "StmtDDLAll",
                            "StmtDMLDelete",
                            "StmtDMLUpdate",
                            "StmtDMLInsert"
                        ]
                    },
                    "MetaTransmissionSettings": {
                        "EnableMeta": True
                    },
                    "ObjectMappings": [
                        {
                            "DestObjName": "huahua",
                            "MappingList": [
                                {
                                    "DestObjName": "*",
                                    "ObjectType": "Table",
                                    "SrcObjName": "*"
                                }
                            ],
                            "ObjectType": "Database",
                            "SrcObjName": "huahua"
                        }
                    ],
                    "PolicyForPrimaryKeyConflict": "Override",
                    "AccountMapping":{           
					              "Account": "test****",
					              "ResetPassword": "true",
					              "Password": "*Test****"
					            }
                    }
                },
                "SolutionType": "MySQL2MySQL"
            },
		     "Tags": [
                {
                    "Key": "key1",
                    "Value": "value1"
                }
            ],
		    "ChargeConfig":{
        "ChargeType":"PostPaid"
	    }
    }
    resp = dts_service.create_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))
    

查询数据传输任务信息

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.describe_transmission_task_info(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

暂停数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.suspend_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

启动数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.start_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

重试数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.retry_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

恢复数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.resume_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

终止数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.stop_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

删除数据传输任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.delete_transmission_task(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

创建预检查任务

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    task_id_str = 'task_id_str'

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'TaskId': task_id_str
    }

    resp = dts_service.pre_check_async(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

查询预检查结果

import json

from volcengine.dts.dts_service import DtsService

if __name__ == '__main__':
    access_key = 'your_ak_here'
    secret_key = 'your_sk_here'
    region_str = 'region_str_here'
    id_str = 'id_str'  # we get id_str from precheck_async api

    dts_service = DtsService(region=region_str)

    dts_service.set_ak(access_key)
    dts_service.set_sk(secret_key)

    params = {}
    body = {
        'ID': id_str
    }

    resp = dts_service.get_async_pre_check_result(params, body)
    print(json.dumps(resp, ensure_ascii=False, sort_keys=True, indent=4, separators=(',', ':')))

常见问题

在使用 Python SDK 发送请求时遇到 Can't connect to HTTPS URL because the SSL module is not available. 报错怎么办?

可能原因:在安装 Python SDK 时,没有正确安装 SSL 模块。

解决方法:重新安装 Python,并确保在安装过程中包含了 SSL 模块。

最近更新时间:2024.06.07 17:45:45
这个页面对您有帮助吗?
有用
有用
无用
无用