You need to enable JavaScript to run this app.
导航

分片上传(Go SDK)

最近更新时间2024.03.21 15:13:15

首次发布时间2021.12.31 17:38:36

上传大对象时可以分成多个数据块(part)来分别上传,最后调用合并分片将上传的数据块合并为一个对象。

注意事项

  • 分片上传前,您必须具有 tos:PutObject 权限,具体操作,请参见权限配置指南
  • 取消分片上传任务前,您必须具有 tos:AbortMultipartUpload 权限,具体操作,请参见权限配置指南
  • 分片编号从 1 开始,最大为 10000。除最后一个分片以外,其他分片大小最小为 4MiB。
  • 上传对象时,对象名必须满足一定规范,详细信息,请参见对象命名规范
  • TOS 是面向海量存储设计的分布式对象存储产品,内部分区存储了对象索引数据,为横向扩展您上传对象和下载对象时的最大吞吐量,和减小热点分区的概率,请您避免使用字典序递增的对象命名方式,详细信息,请参见性能优化
  • 如果桶中已经存在同名对象,则新对象会覆盖已有的对象。如果您的桶开启了版本控制,则会保留原有对象,并生成一个新版本号用于标识新上传的对象。

分片上传步骤

分片上传一般包含以下三个步骤:

  1. 初始化分片上传任务:调用 CreateMultipartUploadV2 方法返回 TOS 创建的全局唯一 UploadID。
  2. 上传分片:调用 UploadPartV2 方法上传分片数据。

    说明

    • 对于同一个分片上传任务(通过 UploadID 标识),分片编号(PartNumber)标识了该分片在整个对象中的相对位置。若通过同一分片编号多次上传数据,TOS 中会覆盖原始数据,并以最后一次上传数据为准。
    • 响应头中包含了数据的 MD5 值,可通过 Etag 获取。合并分片时,您需指定当前分片上传任务中的所有分片信息(分片编号、ETag 值)。
  3. 完成分片上传:所有分片上传完成后,调用 CompleteMultipartUploadV2 方法将所有分片合并成一个完整的对象。

示例代码

分片上传完整过程

下面代码展示将本地文件通过分片的方式上传完整过程,并在上传时指定 ACL 为 Private,存储类型为低频存储以及添加自定义元数据。

package main

import (
   "context"
   "fmt"
   "io"
   "os"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
   "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 指定的 ObjectKey
      objectKey = "*** Provide your object name ***"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 初始化分片,指定对象权限为私有,存储类型为低频并设置元数据信息

   createMultipartOutput, err := client.CreateMultipartUploadV2(ctx, &tos.CreateMultipartUploadV2Input{
      Bucket:       bucketName,
      Key:          objectKey,
      ACL:          enum.ACLPrivate,
      StorageClass: enum.StorageClassIa,
      Meta:         map[string]string{"key": "value"},
   })
   checkErr(err)
   fmt.Println("CreateMultipartUploadV2 Request ID:", createMultipartOutput.RequestID)
   // 获取到上传的 UploadID
   fmt.Println("CreateMultipartUploadV2 Upload ID:", createMultipartOutput.UploadID)
   // 需要上传的文件路径
   localFile := "/root/example.txt"
   fd, err := os.Open(localFile)
   checkErr(err)
   defer fd.Close()
   stat, err := os.Stat(localFile)
   checkErr(err)
   fileSize := stat.Size()
   // partNumber 编号从 1 开始
   partNumber := 1
   // part size 大小设置为 20M
   partSize := int64(20 * 1024 * 1024)
   offset := int64(0)
   var parts []tos.UploadedPartV2
   for offset < fileSize {
      uploadSize := partSize
      // 最后一个分片
      if fileSize-offset < partSize {
         uploadSize = fileSize - offset
      }
      fd.Seek(offset, io.SeekStart)
      partOutput, err := client.UploadPartV2(ctx, &tos.UploadPartV2Input{
         UploadPartBasicInput: tos.UploadPartBasicInput{
            Bucket:     bucketName,
            Key:        objectKey,
            UploadID:   createMultipartOutput.UploadID,
            PartNumber: partNumber,
         },
         Content:       io.LimitReader(fd, uploadSize),
         ContentLength: uploadSize,
      })
      checkErr(err)
      fmt.Println("upload Request ID:", partOutput.RequestID)
      parts = append(parts, tos.UploadedPartV2{PartNumber: partNumber, ETag: partOutput.ETag})
      offset += uploadSize
      partNumber++
   }

   completeOutput, err := client.CompleteMultipartUploadV2(ctx, &tos.CompleteMultipartUploadV2Input{
      Bucket:   bucketName,
      Key:      objectKey,
      UploadID: createMultipartOutput.UploadID,
      Parts:    parts,
   })
   checkErr(err)
   fmt.Println("CompleteMultipartUploadV2 Request ID:", completeOutput.RequestID)

}

列举已上传分片

以下代码用于列举指定存储桶中指定对象已上传的分片信息。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 指定的 ObjectKey
      objectKey = "*** Provide your object name ***"
      uploadID  = "*** Provide upload ID ***"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 列举 uploadID 已上传分片信息
   truncated := true
   marker := 0
   for truncated {
      output, err := client.ListParts(ctx, &tos.ListPartsInput{
         Bucket:           bucketName,
         Key:              objectKey,
         UploadID:         uploadID,
         PartNumberMarker: marker,
      })
      checkErr(err)
      truncated = output.IsTruncated
      marker = output.NextPartNumberMarker
      for _, part := range output.Parts {
         fmt.Println("Part Number:", part.PartNumber)
         fmt.Println("ETag:", part.ETag)
         fmt.Println("Size:", part.Size)
      }
   }

}

取消分片上传任务

您可以通过 AbortMultipartUpload 方法来取消分片上传任务。当一个分片任务被取消后, TOS 会将已上传的分片数据删除,同时您无法再对此分片任务进行任何操作。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 指定的 ObjectKey
      objectKey = "*** Provide your object name ***"
      uploadID  = "*** Provide upload ID ***"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 取消分片上传
   output, err := client.AbortMultipartUpload(ctx, &tos.AbortMultipartUploadInput{
      Bucket:   bucketName,
      Key:      objectKey,
      UploadID: uploadID,
   })
   checkErr(err)
   fmt.Println("AbortMultipartUpload Request ID:", output.RequestID)
}

相关文档