You need to enable JavaScript to run this app.
导航

分片上传(Python SDK)

最近更新时间2024.02.04 18:30:54

首次发布时间2021.12.31 17:38:35

对于较大的对象,可以分成多个数据块(part)来分别上传,最后调用合并分片将上传的数据块合并为一个对象。

注意事项

  • 分片上传前,您必须具有 tos:PutObject 权限,具体操作,请参见权限配置指南
  • 取消分片上传任务前,您必须具有 tos:AbortMultipartUpload 权限,具体操作,请参见权限配置指南
  • 分片编号从 1 开始,最大为 10000。除最后一个分片以外,其他分片大小最小为 5MiB。
  • 上传对象时,对象名必须满足一定规范,详细信息,请参见对象命名规范
  • TOS 是面向海量存储设计的分布式对象存储产品,内部分区存储了对象索引数据,为横向扩展您上传对象和下载对象时的最大吞吐量,和减小热点分区的概率,请您避免使用字典序递增的对象命名方式,详细信息,请参见性能优化
  • 如果桶中已经存在同名对象,则新对象会覆盖已有的对象。如果您的桶开启了版本控制,则会保留原有对象,并生成一个新版本号用于标识新上传的对象。

分片上传步骤

分片上传一般包含以下三个步骤。

  1. 初始化分片上传任务:调用 create_multipart_upload 方法返回 TOS 创建的全局唯一 upload_id。
  2. 上传分片:调用 upload_part 方法上传分片数据。

    说明

    • 对于同一个分片上传任务(通过 upload_id 标识),分片编号(part_number)标识了该分片在整个对象中的相对位置。若通过同一分片编号上多次上传数据,TOS中会覆盖原始数据,并以最后一次上传数据为准。
    • 合并分片时,您需指定当前分片上传任务中的所有分片信息(分片编号、ETag 值)。
  3. 完成分片上传:所有分片上传完成后,调用 complete_multipart_upload 方法将所有分片合并成一个完整的对象。

示例代码

分片上传完整过程

以下代码通过分片上传将本地文件上传到目标桶 bucket-test 中的 object-test 对象。

import os

import tos

from tos.utils import SizeAdapter

# 从环境变量获取 AK 和 SK 信息。
ak = os.getenv('TOS_ACCESS_KEY')
sk = os.getenv('TOS_SECRET_KEY')
# your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。
endpoint = "your endpoint"
region = "your region"
bucket_name = "bucket-test"
object_key = "object-test"

# 本地文件完整路径,例如usr/local/testfile.txt
file_name = '/usr/local/testfile.txt'
total_size = os.path.getsize(file_name)
part_size = 5 * 1024 * 1024
try:
    # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现
    client = tos.TosClientV2(ak, sk, endpoint, region)

    # 初始化上传任务
    # 若需在初始化分片时设置对象的存储类型,可通过storage_class字段设置
    # 若需在初始化分片时设置对象ACL,可通过acl、grant_full_control等字段设置
    multi_result = client.create_multipart_upload(bucket_name, object_key, acl=tos.ACLType.ACL_Public_Read,
                                                  storage_class=tos.StorageClassType.Storage_Class_Standard)

    upload_id = multi_result.upload_id
    parts = []

    # 上传分片数据
    with open(file_name, 'rb') as f:
        part_number = 1
        offset = 0
        while offset < total_size:
            num_to_upload = min(part_size, total_size - offset)
            out = client.upload_part(bucket_name, object_key, upload_id, part_number,
                                     content=SizeAdapter(f, num_to_upload, init_offset=offset))
            parts.append(out)
            offset += num_to_upload
            part_number += 1

    # 完成分片上传任务
    client.complete_multipart_upload(bucket_name, object_key, upload_id, parts)
except tos.exceptions.TosClientError as e:
    # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
    print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
except tos.exceptions.TosServerError as e:
    # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息
    print('fail with server error, code: {}'.format(e.code))
    # request id 可定位具体问题,强烈建议日志中保存
    print('error with request id: {}'.format(e.request_id))
    print('error with message: {}'.format(e.message))
    print('error with http code: {}'.format(e.status_code))
    print('error with ec: {}'.format(e.ec))
    print('error with request url: {}'.format(e.request_url))
except Exception as e:
    print('fail with unknown error: {}'.format(e))

进度条处理

说明

对于字符串、Bytes 和本地文件四种形式的数据,支持进度条功能。网络流等类型数据无法获取上传内容的大小,因此回调时不会返回对象总体大小。

以下代码用于向分片上传过程中添加进度条功能。

import os

import tos
from tos import DataTransferType
from tos.utils import SizeAdapter, MergeProcess

# 从环境变量获取 AK 和 SK 信息。
ak = os.getenv('TOS_ACCESS_KEY')
sk = os.getenv('TOS_SECRET_KEY')
# your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。
endpoint = "your endpoint"
region = "your region"
bucket_name = "bucket-test"
object_key = "object-test"

# 本地文件完整路径,例如usr/local/testfile.txt
file_name = '/usr/local/testfile.txt'
total_size = os.path.getsize(file_name)
part_size = 5 * 1024 * 1024
try:
    # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现
    client = tos.TosClientV2(ak, sk, endpoint, region)


    def percentage(consumed_bytes: int, total_bytes: int, rw_once_bytes: int, type: DataTransferType):
        if total_bytes:
            rate = int(100 * float(consumed_bytes) / float(total_bytes))
            print("rate:{}, consumed_bytes:{},total_bytes{}, rw_once_bytes:{}, type:{}".format(rate, consumed_bytes,
                                                                                               total_bytes,
                                                                                               rw_once_bytes, type))


    # 配置进度条,与普通上传不同的是需将分片上传的进度聚合
    data_transfer_listener = MergeProcess(percentage, total_size, (total_size + part_size - 1) // part_size, 0)
    # 初始化上传任务
    # 若需在初始化分片时设置对象的存储类型,可通过storage_class字段设置
    # 若需在初始化分片时设置对象ACL,可通过acl、grant_full_control等字段设置
    multi_result = client.create_multipart_upload(bucket_name, object_key, acl=tos.ACLType.ACL_Public_Read,
                                                  storage_class=tos.StorageClassType.Storage_Class_Standard)

    upload_id = multi_result.upload_id
    parts = []

    # 上传分片数据
    with open(file_name, 'rb') as f:
        part_number = 1
        offset = 0
        while offset < total_size:
            num_to_upload = min(part_size, total_size - offset)
            out = client.upload_part(bucket_name, object_key, upload_id, part_number,
                                     content=SizeAdapter(f, num_to_upload, init_offset=offset),
                                     data_transfer_listener=data_transfer_listener)
            parts.append(out)
            offset += num_to_upload
            part_number += 1

    # 完成分片上传任务
    client.complete_multipart_upload(bucket_name, object_key, upload_id, parts)
except tos.exceptions.TosClientError as e:
    # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
    print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
except tos.exceptions.TosServerError as e:
    # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息
    print('fail with server error, code: {}'.format(e.code))
    # request id 可定位具体问题,强烈建议日志中保存
    print('error with request id: {}'.format(e.request_id))
    print('error with message: {}'.format(e.message))
    print('error with http code: {}'.format(e.status_code))
    print('error with ec: {}'.format(e.ec))
    print('error with request url: {}'.format(e.request_url))
except Exception as e:
    print('fail with unknown error: {}'.format(e))

取消分片上传任务

您可以通过 abort_multipart_upload 方法来取消分片上传任务。当一个分片任务被取消后, TOS 会将已上传的分片数据删除,同时您无法再对此分片任务进行任何操作。
以下代码用于取消桶 bucket-test 中对象 object-test 的分片上传任务。

import os
import tos

# 从环境变量获取 AK 和 SK 信息。
ak = os.getenv('TOS_ACCESS_KEY')
sk = os.getenv('TOS_SECRET_KEY')
# your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。
endpoint = "your endpoint"
region = "your region"
bucket_name = "bucket-test"
object_key = "object-test"
# 由create_multipart_upload接口返回的upload_id
upload_id = "your upload id"
try:
    client = tos.TosClientV2(ak, sk, endpoint, region)
    # 取消分片指定upload_id的分片上传事件,已上传的分片会被删除。
    client.abort_multipart_upload(bucket_name, object_key, upload_id)
except tos.exceptions.TosClientError as e:
    # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
    print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
except tos.exceptions.TosServerError as e:
    # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息
    print('fail with server error, code: {}'.format(e.code))
    # request id 可定位具体问题,强烈建议日志中保存
    print('error with request id: {}'.format(e.request_id))
    print('error with message: {}'.format(e.message))
    print('error with http code: {}'.format(e.status_code))
    print('error with ec: {}'.format(e.ec))
    print('error with request url: {}'.format(e.request_url))
except Exception as e:
    print('fail with unknown error: {}'.format(e))

列举已上传的分片信息

以下代码用于列举桶桶 bucket-test 中对象 object-test 已上传的分片信息。

import os
import tos

# 从环境变量获取 AK 和 SK 信息。
ak = os.getenv('TOS_ACCESS_KEY')
sk = os.getenv('TOS_SECRET_KEY')
# your endpoint 和 your region 填写Bucket 所在区域对应的Endpoint。# 以华北2(北京)为例,your endpoint 填写 tos-cn-beijing.volces.com,your region 填写 cn-beijing。
endpoint = "your endpoint"
region = "your region"
bucket_name = "bucket-test"
object_key = "object-test"
# 由create_multipart_upload接口返回的upload_id
upload_id = "upload_id"
try:
    # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现
    client = tos.TosClientV2(ak, sk, endpoint, region)

    # 列举指定upload_id对应已上传的分片信息。
    truncated = True
    marker = 0
    while truncated:
        # part_number_marker 指定分片上传的起始位
        # is_truncated 为 True 时,表明还存在未列举完成的分片
        out = client.list_parts(bucket_name, object_key, upload_id, part_number_marker=marker)
        for part in out.parts:
            print('part_number:', part.part_number)
            print('etag:', part.etag)
            print('size', part.size)
        marker = out.next_part_number_marker
        truncated = out.is_truncated
except tos.exceptions.TosClientError as e:
    # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
    print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
except tos.exceptions.TosServerError as e:
    # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息
    print('fail with server error, code: {}'.format(e.code))
    # request id 可定位具体问题,强烈建议日志中保存
    print('error with request id: {}'.format(e.request_id))
    print('error with message: {}'.format(e.message))
    print('error with http code: {}'.format(e.status_code))
    print('error with ec: {}'.format(e.ec))
    print('error with request url: {}'.format(e.request_url))
except Exception as e:
    print('fail with unknown error: {}'.format(e))

相关文档