You need to enable JavaScript to run this app.
导航

断点续传拷贝(Go SDK)

最近更新时间2024.02.04 18:31:05

首次发布时间2023.03.15 14:39:19

断点续传复制适用于通过 TOS Go SDK 在单个桶内或同区域的两个桶之间复制大对象的场景。TOS Go SDK 提供了断点续传拷贝的功能,借助本地 CheckPoint 的机制记录已成功复制的分段,当出现网络异常或机器故障等问题导致分段复制中断,可再次调用该接口以实现续传的效果。断点续传复制将待复制的对象分割为多个分段,并支持并发复制,待所有分段复制完成后,合并成完整的文件。您可以设置断点续传复制的分段大小、复制分段的线程数、事件回调函数等。同时也能在断点续传复制任务执行过程中,取消该任务。

注意事项

  • 拷贝文件不支持跨区域的桶间拷贝。
  • 拷贝对象时,账号必须具备源对象的读取权限和目标桶的写入权限。
  • 拷贝对象时,可以保留所有元数据(默认值)或指定新的元数据。但ACL并未被保留,而是设置为私有。

断点续传拷贝

以下代码用于断点续传拷贝 srcBucketName 桶中 srcObjectKey 对象到 dstBucketName桶中,并设置对象对象名为 dstObjectKey 以及失败后重入下载。若复制过程中返回网络超时的报错,则以相同参数调用 ResumableCopyObject 后实现断点续传拷贝重入。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对于的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      srcBucketName = "*** Provide your src bucket name ***"
      dstBucketName = "*** Provide your dst bucket name ***"

      srcObjectKey       = "srcObjectKey"
      dstObjectKey       = "dstObjectKey"
      checkPointFilePath = "./example.checkpoint"
      ctx                = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   input := &tos.ResumableCopyObjectInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: dstBucketName,
         Key:    dstObjectKey,
      },
      SrcBucket:        srcBucketName,
      SrcKey:           srcObjectKey,
      PartSize:         20 * 1024 * 1024,
      TaskNum:          3,
      EnableCheckpoint: true,
      CheckpointFile:   checkPointFilePath,
   }
   output, err := client.ResumableCopyObject(ctx, input)
   checkErr(err)
   // 上传结果相关信息
   fmt.Println("ResumableCopyObject Request Bucket:", output.Bucket)
   fmt.Println("ResumableCopyObject Request Key:", output.Key)
   fmt.Println("ResumableCopyObject Request UploadID:", output.UploadID)
   fmt.Println("ResumableCopyObject Request Etag:", output.Etag)
   fmt.Println("ResumableCopyObject Request Location:", output.Location)
   fmt.Println("ResumableCopyObject Request VersionID:", output.VersionID)
   fmt.Println("ResumableCopyObject Request HashCrc64ecma:", output.HashCrc64ecma)
}

处理事件回调

以下代码用于自定义断点续传拷贝回调函数。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
   "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
)

// 自定义事件监听,需要实现 tos.CopyEventListener 接口
type eventChange struct {
}

func (e eventChange) EventChange(event *tos.CopyEvent) {
   switch event.Type {
   case enum.CopyEventCreateMultipartUploadSucceed:
      fmt.Println("Resume Create Multipart Upload Success.\n ")
   case enum.CopyEventCreateMultipartUploadFailed:
      fmt.Println("Resume Create Multipart Upload Fail.\n ")
   case enum.CopyEventUploadPartCopySuccess:
      fmt.Printf("Copy %s %s part success, CopyPartInfo:%v\n", event.Bucket, event.Key, event.CopyPartInfo)
   case enum.CopyEventUploadPartCopyFailed:
      fmt.Printf("Copy %s %s part aborted.\n", event.Bucket, event.Key)
   case enum.CopyEventUploadPartCopyAborted:
      fmt.Printf("Copy %s %s part fail, err:%v\n", event.Bucket, event.Key, event.Err)
   case enum.CopyEventCompleteMultipartUploadSucceed:
      fmt.Printf("Copy %s %s success.\n", event.Bucket, event.Key)
   case enum.CopyEventCompleteMultipartUploadFailed:
      fmt.Printf("Copy %s %s fail,err:%v\n", event.Bucket, event.Key, event.Err)

   }
}

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对于的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      srcBucketName = "*** Provide your src bucket name ***"
      dstBucketName = "*** Provide your dst bucket name ***"

      srcObjectKey       = "srcObjectKey"
      dstObjectKey       = "dstObjectKey"
      checkPointFilePath = "./example.checkpoint"
      ctx                = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   input := &tos.ResumableCopyObjectInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: dstBucketName,
         Key:    dstObjectKey,
      },
      SrcBucket:         srcBucketName,
      SrcKey:            srcObjectKey,
      PartSize:          20 * 1024 * 1024,
      TaskNum:           3,
      EnableCheckpoint:  true,
      CheckpointFile:    checkPointFilePath,
      CopyEventListener: eventChange{},
   }
   output, err := client.ResumableCopyObject(ctx, input)
   checkErr(err)
   // 上传结果相关信息
   fmt.Println("ResumableCopyObject Request Bucket:", output.Bucket)
   fmt.Println("ResumableCopyObject Request Key:", output.Key)
   fmt.Println("ResumableCopyObject Request UploadID:", output.UploadID)
   fmt.Println("ResumableCopyObject Request Etag:", output.Etag)
   fmt.Println("ResumableCopyObject Request Location:", output.Location)
   fmt.Println("ResumableCopyObject Request VersionID:", output.VersionID)
   fmt.Println("ResumableCopyObject Request HashCrc64ecma:", output.HashCrc64ecma)
}

取消机制

以下代码用于暂停或取消断点续传拷贝任务。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对于的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      srcBucketName = "*** Provide your src bucket name ***"
      dstBucketName = "*** Provide your dst bucket name ***"

      srcObjectKey       = "srcObjectKey"
      dstObjectKey       = "dstObjectKey"
      checkPointFilePath = "./example.checkpoint"
      ctx                = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 使用 cancel() 实现对任务的取消
   cancel := tos.NewCancelHook()
   input := &tos.ResumableCopyObjectInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: dstBucketName,
         Key:    dstObjectKey,
      },
      SrcBucket:        srcBucketName,
      SrcKey:           srcObjectKey,
      PartSize:         20 * 1024 * 1024,
      TaskNum:          3,
      EnableCheckpoint: true,
      CheckpointFile:   checkPointFilePath,
      CancelHook:       cancel,
   }
   output, err := client.ResumableCopyObject(ctx, input)
   checkErr(err)
   // 上传结果相关信息
   fmt.Println("ResumableCopyObject Request Bucket:", output.Bucket)
   fmt.Println("ResumableCopyObject Request Key:", output.Key)
   fmt.Println("ResumableCopyObject Request UploadID:", output.UploadID)
   fmt.Println("ResumableCopyObject Request Etag:", output.Etag)
   fmt.Println("ResumableCopyObject Request Location:", output.Location)
   fmt.Println("ResumableCopyObject Request VersionID:", output.VersionID)
   fmt.Println("ResumableCopyObject Request HashCrc64ecma:", output.HashCrc64ecma)
}