You need to enable JavaScript to run this app.
导航

断点续传(Go SDK)

最近更新时间2024.02.04 18:30:55

首次发布时间2022.02.28 16:43:41

使用断点续传上传的方式将文件上传到 TOS 时,您可以设置分片大小、上传分片的线程数、上传时客户端限速、事件回调函数等。上传过程中,如果出现网络异常或程序崩溃导致文件上传失败时,将从断点记录处继续上传未上传完成的部分。在上传的过程中可以通过调用传入的 CancelHook 中的 Cancel 方法取消对象上传。

注意事项

  • 上传对象前,您必须具有 tos:PutObject 权限,具体操作,请参见权限配置指南
  • 上传对象时,对象名必须满足一定规范,详细信息,请参见对象命名规范
  • TOS 是面向海量存储设计的分布式对象存储产品,内部分区存储了对象索引数据,为横向扩展您上传对象和下载对象时的最大吞吐量,和减小热点分区的概率,请您避免使用字典序递增的对象命名方式,详细信息,请参见性能优化
  • 如果桶中已经存在同名对象,则新对象会覆盖已有的对象。如果您的桶开启了版本控制,则会保留原有对象,并生成一个新版本号用于标识新上传的对象。
  • SDK 会将上传的状态信息记录在 Checkpoint 文件中,所以程序需要对 Checkpoint 文件有写权限。
  • 使用断点续传上传时,文件上传的进度信息会记录在 Checkpoint 文件中,如果上传过程中某一分片上传失败,再次上传时会 Checkpoint 文件中记录的点继续上传。上传完成后, Checkpoint 文件会被删除。
  • 如果上传过程中本地文件发生了改变,则会重新上传所有分片。

示例代码

上传文件

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)
func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
      // 本地文件完整路径,例如usr/local/testfile.txt
      fileName = "/usr/local/testfile.txt"
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 直接使用文件路径上传文件
   output, err := client.UploadFile(ctx, &tos.UploadFileInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: bucketName,
         Key:    objectKey,
      },
      // 上传的文件路径
      FilePath: fileName,
      // 上传时指定分片大小
      PartSize: tos.DefaultPartSize,
      // 分片上传任务并发数量
      TaskNum: 5,
      // 开启断点续传
      EnableCheckpoint: true,
   })
   checkErr(err)
   fmt.Println("PutObjectV2 Request ID:", output.RequestID)
}

遍历本地文件夹并上传到桶中

package main

import (
   "context"
   "errors"
   "fmt"
   "io/ioutil"
   "os"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

var (
   accessKey = os.Getenv("TOS_ACCESS_KEY")
   secretKey = os.Getenv("TOS_SECRET_KEY")
   // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
   endpoint = "https://tos-cn-beijing.volces.com"
   region   = "cn-beijing"
   // 填写 BucketName
   bucketName = "*** Provide your bucket name ***"
   // 本地文件夹路径
   dirPath = "/usr/local/"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

var cli *tos.ClientV2

func uploadDir(ctx context.Context, dirPath string) error {
   file, err := os.Stat(dirPath)
   if err != nil {
      return err
   }

   if !file.IsDir() {
      return errors.New("please input file path. ")
   }
   files, err := ioutil.ReadDir(dirPath)
   for _, f := range files {
      if f.IsDir() {
         err = uploadDir(ctx, dirPath+f.Name())
         if err != nil {
            return nil
         }
         continue
      }
      output, err := cli.UploadFile(ctx, &tos.UploadFileInput{
         CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
            Bucket: bucketName,
            Key:    dirPath + f.Name(),
         },
         // 上传的文件路径
         FilePath: dirPath + f.Name(),
         // 上传时指定分片大小
         PartSize: tos.DefaultPartSize,
         // 分片上传任务并发数量
         TaskNum: 5,
         // 开启断点续传
         EnableCheckpoint: true,
      })
      checkErr(err)
      fmt.Println("PutObjectV2 Request ID:", output.RequestID)
   }
}

func main() {

   // 初始化客户端
   var err error
   cli, err = tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   ctx := context.Background()
   err = uploadDir(ctx, dirPath)
   if err != nil {
      panic(err)
   }

}

配置进度条

断点续传上传时可通过实现 tos.DataTransferStatusChange 接口接收上传进度,代码示例如下。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
   "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
)

// 自定义进度回调,需要实现 tos.DataTransferStatusChange 接口
type listener struct {
}

func (l *listener) DataTransferStatusChange(event *tos.DataTransferStatus) {
   switch event.Type {
   case enum.DataTransferStarted:
      fmt.Println("Data transfer started")
   case enum.DataTransferRW:
      // Chunk 模式下 TotalBytes 值为 -1
      if event.TotalBytes != -1 {
         fmt.Printf("Once Read:%d,ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.RWOnceBytes, event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
      } else {
         fmt.Printf("Once Read:%d,ConsumerBytes:%d\n", event.RWOnceBytes, event.ConsumedBytes)
      }
   case enum.DataTransferSucceed:
      fmt.Printf("Data Transfer Succeed, ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
   case enum.DataTransferFailed:
      fmt.Printf("Data Transfer Failed\n")
   }
}

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
      // 本地文件完整路径,例如usr/local/testfile.txt
      fileName = "/usr/local/testfile.txt"
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 直接使用文件路径上传文件
   output, err := client.UploadFile(ctx, &tos.UploadFileInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: bucketName,
         Key:    objectKey,
      },
      // 上传的文件路径
      FilePath: fileName,
      // 上传时指定分片大小
      PartSize: tos.DefaultPartSize,
      // 分片上传任务并发数量
      TaskNum: 5,
      // 开启断点续传
      EnableCheckpoint: true,
      // 数据传输回调
      DataTransferListener: &listener{},
   })
   checkErr(err)
   fmt.Println("PutObjectV2 Request ID:", output.RequestID)
}

处理事件回调

以下代码用于自定义断点续传上传回调函数。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
   "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
)

// 自定义事件监听,需要实现 tos.UploadEventListener 接口
type eventChange struct {
}

func (e eventChange) EventChange(event *tos.UploadEvent) {
   switch event.Type {
   case enum.UploadEventCreateMultipartUploadSucceed:
      fmt.Printf("Upload to %s %s create multipart upload success, upload id:%s\n", event.Bucket, event.Key, event.UploadID)
   case enum.UploadEventCreateMultipartUploadFailed:
      fmt.Printf("Upload to %s %s create multipart upload fail, err:%v\n", event.Bucket, event.Key, event.Err)
   case enum.UploadEventUploadPartSucceed:
      fmt.Printf("Upload to %s %s part success, UploadPartInfo:%v\n", event.Bucket, event.Key, event.UploadPartInfo)
   case enum.UploadEventUploadPartAborted:
      fmt.Printf("Upload to %s %s part aborted,  upload id:%s\n", event.Bucket, event.Key, event.UploadID)
   case enum.UploadEventUploadPartFailed:
      fmt.Printf("Upload to %s %s part fail,  upload id:%s, err:%v\n", event.Bucket, event.Key, event.UploadID, event.Err)
   case enum.UploadEventCompleteMultipartUploadSucceed:
      fmt.Printf("Upload to %s %s success,  upload id:%s\n", event.Bucket, event.Key, event.UploadID)
   case enum.UploadEventCompleteMultipartUploadFailed:
      fmt.Printf("Upload to %s %s fail,  upload id:%s, err:%v\n", event.Bucket, event.Key, event.UploadID, event.Err)

   }
}

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
      // 本地文件完整路径,例如usr/local/testfile.txt
      fileName = "/usr/local/testfile.txt"
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 直接使用文件路径上传文件
   output, err := client.UploadFile(ctx, &tos.UploadFileInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: bucketName,
         Key:    objectKey,
      },
      // 上传的文件路径
      FilePath: fileName,
      // 上传时指定分片大小
      PartSize: tos.DefaultPartSize,
      // 分片上传任务并发数量
      TaskNum: 5,
      // 开启断点续传
      EnableCheckpoint: true,
      // 事件监听回调
      UploadEventListener: eventChange{},
      
   })
   checkErr(err)
   fmt.Println("PutObjectV2 Request ID:", output.RequestID)
}

配置客户端限速

断点续传上传时可以通过客户端使用 tos.RateLimiter 接口对所占用的带宽进行限制,代码如下所示。

package main

import (
   "context"
   "fmt"
   "sync"
   "time"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

type rateLimit struct {
   rate     int64
   capacity int64

   currentAmount int64
   sync.Mutex
   lastConsumeTime time.Time
}

func NewDefaultRateLimit(rate int64, capacity int64) tos.RateLimiter {
   return &rateLimit{
      rate:            rate,
      capacity:        capacity,
      lastConsumeTime: time.Now(),
      currentAmount:   capacity,
      Mutex:           sync.Mutex{},
   }
}

func (d *rateLimit) Acquire(want int64) (ok bool, timeToWait time.Duration) {
   d.Lock()
   defer d.Unlock()
   if want > d.capacity {
      want = d.capacity
   }
   increment := int64(time.Now().Sub(d.lastConsumeTime).Seconds() * float64(d.rate))
   if increment+d.currentAmount > d.capacity {
      d.currentAmount = d.capacity
   } else {
      d.currentAmount += increment
   }
   if want > d.currentAmount {
      timeToWaitSec := float64(want-d.currentAmount) / float64(d.rate)
      return false, time.Duration(timeToWaitSec * float64(time.Second))
   }
   d.lastConsumeTime = time.Now()
   d.currentAmount -= want
   return true, 0
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
      // 本地文件完整路径,例如usr/local/testfile.txt
      fileName = "/usr/local/testfile.txt"
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 直接使用文件路径上传文件
   rateLimit1m := int64(1024 * 1024)
   output, err := client.UploadFile(ctx, &tos.UploadFileInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: bucketName,
         Key:    objectKey,
      },
      // 上传的文件路径
      FilePath: fileName,
      // 上传时指定分片大小
      PartSize: tos.DefaultPartSize,
      // 分片上传任务并发数量
      TaskNum: 5,
      // 开启断点续传
      EnableCheckpoint: true,
      // 上传客户端限速
      RateLimiter: NewDefaultRateLimit(rateLimit1m, rateLimit1m),
   })
   checkErr(err)
   fmt.Println("PutObjectV2 Request ID:", output.RequestID)
}

取消机制

以下代码用于在运行时取消正在执行的断点续传上传任务。

package main

import (
   "context"
   "fmt"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
      // 本地文件完整路径,例如usr/local/testfile.txt
      fileName = "/usr/local/testfile.txt"
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   // 直接使用文件路径上传文件
   cancel := tos.NewCancelHook()
   output, err := client.UploadFile(ctx, &tos.UploadFileInput{
      CreateMultipartUploadV2Input: tos.CreateMultipartUploadV2Input{
         Bucket: bucketName,
         Key:    objectKey,
      },
      // 上传的文件路径
      FilePath: fileName,
      // 上传时指定分片大小
      PartSize: tos.DefaultPartSize,
      // 分片上传任务并发数量
      TaskNum: 5,
      // 开启断点续传
      EnableCheckpoint: true,
      // 取消上传
      CancelHook: cancel,
   })
   checkErr(err)
   fmt.Println("PutObjectV2 Request ID:", output.RequestID)
}