You need to enable JavaScript to run this app.
导航

追加上传(Go SDK)

最近更新时间2024.02.04 18:30:55

首次发布时间2021.12.31 17:38:36

AppendObjectV2 接口用于追加写对象。追加写是指在已上传的对象末尾追加内容。只能对类型为 Appendable 的对象使用此接口。通过直接上传和分片上传创建的对象类型为 Normal。

注意事项

  • 追加上传对象前,您必须具有 tos:PutObject 权限,具体操作请参见权限配置指南
  • 上传对象时,对象名必须满足一定规范,详细信息请参见对象命名规范
  • TOS 是面向海量存储设计的分布式对象存储产品,内部分区存储了对象索引数据,为横向扩展您上传对象和下载对象时的最大吞吐量,和减小热点分区的概率,请您避免使用字典序递增的对象命名方式,详细信息,请参见性能优化
  • 如果桶中已经存在同名对象,则新对象会覆盖已有的对象。桶开启多版本的场景下,则会保留原有对象,生成一个新版本号用于标识新上传的对象。
  • 追加上传对象不支持 Chunk-Encoded 的请求方式,当您追加上传网络流时请迭代获取数据再追加上传。

限制说明

使用 AppendObject 接口时,对象的大小限制说明如下:

  • 每次追加的大小不能小于 128KiB,追加后的对象大小不能大于 5GiB。
  • 通过 AppendObject 创建的对象,进行 PutObject 操作,对象被覆盖且对象类型由 Appendable 变为 Normal。反之通过 PutObject 上传的对象不支持追加写操作。
  • AppendObject 创建的对象不支持拷贝。
  • 如果对象类型为低频存储或归档闪回存储,则无法使用 AppendObject 接口。
  • 如果您的存储桶处于开启或暂停多版本控制功能的状态下,则无法使用 AppendObject 接口。

示例代码

追加上传

以下代码用于将字符流追加上传到目标桶。

package main

import (
   "context"
   "fmt"
   "net/http"
   "strings"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   body := strings.NewReader("your append object value")
   // 追加上传,监听自定义上传进度回调并在客户端限制上传速度
   // 上传字符流
   output, err := client.AppendObjectV2(ctx, &tos.AppendObjectV2Input{
      Bucket:  bucketName,
      Key:     objectKey,
      Content: body,
   })
   checkErr(err)
   fmt.Println("AppendObjectV2 Request ID:", output.RequestID)

   // 追加上传网络流
   res, err := http.Get("https://www.volcengine.com/")
   checkErr(err)
   defer res.Body.Close()
   output, err = client.AppendObjectV2(ctx, &tos.AppendObjectV2Input{
      Bucket: bucketName,
      Key:    objectKey,
      // 指定下次 CRC 计算初始值
      PreHashCrc64ecma: output.HashCrc64ecma,
      // 指定下次 append offset
      Offset:  output.NextAppendOffset,
      Content: body,
   })
   checkErr(err)
   fmt.Println("AppendObjectV2 Request ID:", output.RequestID)

}

配置进度条

追加上传时可通过实现 tos.DataTransferStatusChange 接口接收上传进度,代码示例如下。

package main

import (
   "context"
   "fmt"
   "strings"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
   "github.com/volcengine/ve-tos-golang-sdk/v2/tos/enum"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

// 自定义进度回调,需要实现 tos.DataTransferStatusChange 接口
type listener struct {
}

func (l *listener) DataTransferStatusChange(event *tos.DataTransferStatus) {
   switch event.Type {
   case enum.DataTransferStarted:
      fmt.Println("Data transfer started")
   case enum.DataTransferRW:
       // Chunk 模式下 TotalBytes 值为 -1
       if event.TotalBytes != -1 {
           fmt.Printf("Once Read:%d,ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.RWOnceBytes, event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
       } else {
           fmt.Printf("Once Read:%d,ConsumerBytes:%d\n", event.RWOnceBytes, event.ConsumedBytes)
       }
   case enum.DataTransferSucceed:
      fmt.Printf("Event success, ConsumerBytes/TotalBytes: %d/%d,%d%%\n", event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
   case enum.DataTransferFailed:
      fmt.Printf("Data Transfer Failed\n")
   }
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   body := strings.NewReader("your append object value")
   // 追加上传,监听自定义上传进度回调
   // 上传字符流
   output, err := client.AppendObjectV2(ctx, &tos.AppendObjectV2Input{
      Bucket:               bucketName,
      Key:                  objectKey,
      Content:              body,
      DataTransferListener: &listener{},
   })
   checkErr(err)
   fmt.Println("AppendObjectV2 Request ID:", output.RequestID)

}

配置客户端限速

追加上传时可以通过客户端使用 tos.RateLimiter 接口对上传数据所占用的带宽进行限制,代码如下所示。

package main

import (
   "context"
   "fmt"
   "strings"
   "sync"
   "time"

   "github.com/volcengine/ve-tos-golang-sdk/v2/tos"
)

func checkErr(err error) {
   if err != nil {
      if serverErr, ok := err.(*tos.TosServerError); ok {
         fmt.Println("Error:", serverErr.Error())
         fmt.Println("Request ID:", serverErr.RequestID)
         fmt.Println("Response Status Code:", serverErr.StatusCode)
         fmt.Println("Response Header:", serverErr.Header)
         fmt.Println("Response Err Code:", serverErr.Code)
         fmt.Println("Response Err Msg:", serverErr.Message)
      } else if clientErr, ok := err.(*tos.TosClientError); ok {
         fmt.Println("Error:", clientErr.Error())
         fmt.Println("Client Cause Err:", clientErr.Cause.Error())
      } else {
         fmt.Println("Error:", err)
      }
      panic(err)
   }
}

type rateLimit struct {
   rate     int64
   capacity int64

   currentAmount int64
   sync.Mutex
   lastConsumeTime time.Time
}

func NewDefaultRateLimit(rate int64, capacity int64) tos.RateLimiter {
   return &rateLimit{
      rate:            rate,
      capacity:        capacity,
      lastConsumeTime: time.Now(),
      currentAmount:   capacity,
      Mutex:           sync.Mutex{},
   }
}

func (d *rateLimit) Acquire(want int64) (ok bool, timeToWait time.Duration) {
   d.Lock()
   defer d.Unlock()
   if want > d.capacity {
      want = d.capacity
   }
   increment := int64(time.Now().Sub(d.lastConsumeTime).Seconds() * float64(d.rate))
   if increment+d.currentAmount > d.capacity {
      d.currentAmount = d.capacity
   } else {
      d.currentAmount += increment
   }
   if want > d.currentAmount {
      timeToWaitSec := float64(want-d.currentAmount) / float64(d.rate)
      return false, time.Duration(timeToWaitSec * float64(time.Second))
   }
   d.lastConsumeTime = time.Now()
   d.currentAmount -= want
   return true, 0
}

func main() {
   var (
      accessKey = os.Getenv("TOS_ACCESS_KEY")
      secretKey = os.Getenv("TOS_SECRET_KEY")
      // Bucket 对应的 Endpoint,以华北2(北京)为例:https://tos-cn-beijing.volces.com
      endpoint = "https://tos-cn-beijing.volces.com"
      region   = "cn-beijing"
      // 填写 BucketName
      bucketName = "*** Provide your bucket name ***"

      // 将文件上传到 example_dir 目录下的 example.txt 文件
      objectKey = "example_dir/example.txt"
      ctx       = context.Background()
   )
   // 初始化客户端
   client, err := tos.NewClientV2(endpoint, tos.WithRegion(region), tos.WithCredentials(tos.NewStaticCredentials(accessKey, secretKey)))
   checkErr(err)
   rateLimit1M := int64(1024 * 1024)
   body := strings.NewReader("your append object value")
   // 追加上传,在客户端限制上传速度
   // 上传字符流
   output, err := client.AppendObjectV2(ctx, &tos.AppendObjectV2Input{
      Bucket:               bucketName,
      Key:                  objectKey,
      Content:              body,
      RateLimiter:          NewDefaultRateLimit(rateLimit1M, rateLimit1M),
   })
   checkErr(err)
   fmt.Println("AppendObjectV2 Request ID:", output.RequestID)
   

}

相关文档

关于追加上传的 API 文档,请参见 AppendObject