You need to enable JavaScript to run this app.
导航

断点续传(Java SDK)

最近更新时间2024.02.04 18:31:05

首次发布时间2023.03.15 15:27:55

断点续传拷贝适用于通过 TOS Go SDK 在单个桶内或同区域的两个桶之间拷贝大对象的场景。TOS Go SDK 提供了断点续传下载的功能,借助本地 CheckPoint 的机制记录已成功拷贝的分段,当出现网络异常或机器故障等问题导致分段拷贝中断,可再次调用该接口以实现续传的效果。
断点续传拷贝将待拷贝的对象分割为多个分段,并支持并发拷贝,待所有分段拷贝完成后,合并成完整的文件。您可以设置断点续传拷贝的分段大小、拷贝分段的线程数、事件回调函数等。同时也能在断点续传拷贝任务执行过程中,取消该任务。

注意事项

  • 拷贝文件不支持跨区域的桶间拷贝。
  • 拷贝对象时,账号必须具备源对象的读取权限和目标桶的写入权限。
  • 拷贝对象时,可以保留所有元数据(默认值)或指定新的元数据。但ACL并未被保留,而是设置为私有。

断点续传拷贝

以下代码用于断点续传拷贝 srcBucketName 桶中 srcObjectKey 对象到 dstBucketName桶中,并设置对象对象名为 dstObjectKey 以及失败后重入下载。若拷贝过程中返回网络超时的报错,则以相同参数调用 ResumableCopyObject 后实现断点续传下载重入。

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.model.object.ResumableCopyObjectInput;
import com.volcengine.tos.model.object.ResumableCopyObjectOutput;

public class ResumableCopyObjectExample {
    public static void main(String[] args) {
        String endpoint = "your endpoint";
        String region = "your region";
        String accessKey = System.getenv("TOS_ACCESS_KEY");
        String secretKey = System.getenv("TOS_SECRET_KEY");

        String bucketName = "bucket-example";
        String objectKey = "example_dir/example_object.txt";

        String sourceBucketName = "source-bucket";
        String sourceObjectKey = "src_example_dir/src_example_object.txt";
        // taskNum 设置并发上传的并发数,范围为 1-1000
        int taskNum = 5;
        // partSize 设置文件分片大小,范围为 5MB - 5GB,默认为 20MB
        long partSize = 10 * 1024 * 1024;
        // enableCheckpoint 设置是否开启断点续传功能,开启则会在本地记录上传进度
        boolean enableCheckpoint = true;
        // checkpointFilePath 设置断点续传记录文件存放位置
        // 其格式为 {checkpointFilePath}+{bucket+objectKey+versionID 的 Base64Md5 值}.download
        String checkpointFilePath = "the checkpoint file path";

        TOSV2 tos = new TOSV2ClientBuilder().build(region, endpoint, accessKey, secretKey);

        try{
            ResumableCopyObjectInput input = new ResumableCopyObjectInput().setBucket(bucketName).setKey(objectKey)
                    .setSrcBucket(sourceBucketName).setSrcKey(sourceObjectKey).setTaskNum(taskNum).setPartSize(partSize)
                    .setEnableCheckpoint(enableCheckpoint).setCheckpointFile(checkpointFilePath);
            ResumableCopyObjectOutput output = tos.resumableCopyObject(input);
            System.out.println("resumableCopyObject succeed, object's etag is " + output.getEtag());
            System.out.println("resumableCopyObject succeed, object's crc64 is " + output.getHashCrc64ecma());
        } catch (TosClientException e) {
            // 操作失败,捕获客户端异常,一般情况是请求参数错误,此时请求并未发送
            System.out.println("resumableCopyObject failed");
            System.out.println("Message: " + e.getMessage());
            if (e.getCause() != null) {
                e.getCause().printStackTrace();
            }
        } catch (TosServerException e) {
            // 操作失败,捕获服务端异常,可以获取到从服务端返回的详细错误信息
            System.out.println("resumableCopyObject failed");
            System.out.println("StatusCode: " + e.getStatusCode());
            System.out.println("Code: " + e.getCode());
            System.out.println("Message: " + e.getMessage());
            System.out.println("RequestID: " + e.getRequestID());
        } catch (Throwable t) {
            // 作为兜底捕获其他异常,一般不会执行到这里
            System.out.println("resumableCopyObject failed");
            System.out.println("unexpected exception, message: " + t.getMessage());
        }
    }
}

处理事件回调

以下代码用于自定义断点续传拷贝回调函数。

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.comm.event.CopyEventType;
import com.volcengine.tos.model.object.CopyEvent;
import com.volcengine.tos.model.object.CopyEventListener;
import com.volcengine.tos.model.object.ResumableCopyObjectInput;
import com.volcengine.tos.model.object.ResumableCopyObjectOutput;

public class ResumableCopyObjectWithEventListenerExample {
    public static void main(String[] args) {
        String endpoint = "your endpoint";
        String region = "your region";
        String accessKey = System.getenv("TOS_ACCESS_KEY");
        String secretKey = System.getenv("TOS_SECRET_KEY");

        String bucketName = "bucket-example";
        String objectKey = "example_dir/example_object.txt";

        String sourceBucketName = "source-bucket";
        String sourceObjectKey = "src_example_dir/src_example_object.txt";
        // taskNum 设置并发上传的并发数,范围为 1-1000
        int taskNum = 5;
        // partSize 设置文件分片大小,范围为 5MB - 5GB,默认为 20MB
        long partSize = 10 * 1024 * 1024;
        // enableCheckpoint 设置是否开启断点续传功能,开启则会在本地记录上传进度
        boolean enableCheckpoint = true;
        // checkpointFilePath 设置断点续传记录文件存放位置
        // 其格式为 {checkpointFilePath}+{bucket+objectKey+versionID 的 Base64Md5 值}.download
        String checkpointFilePath = "the checkpoint file path";

        TOSV2 tos = new TOSV2ClientBuilder().build(region, endpoint, accessKey, secretKey);

        try{
            CopyEventListener listener = new CopyEventListener() {
                @Override
                public void eventChange(CopyEvent copyEvent) {
                    if (copyEvent.getType() == CopyEventType.CopyEventCreateMultipartUploadSucceed) {
                        System.out.println("event change, createMultipartUpload succeed");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventCreateMultipartUploadFailed) {
                        System.out.println("event change, createMultipartUpload failed");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventUploadPartCopySucceed) {
                        System.out.println("event change, uploadPartCopy succeed");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventUploadPartCopyFailed) {
                        System.out.println("event change, uploadPartCopy failed");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventUploadPartCopyAborted) {
                        System.out.println("event change, uploadPartCopy aborted");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventCompleteMultipartUploadSucceed) {
                        System.out.println("event change, completeMultipartUpload succeed");
                    }
                    if (copyEvent.getType() == CopyEventType.CopyEventCompleteMultipartUploadFailed) {
                        System.out.println("event change, completeMultipartUpload failed");
                    }
                }
            };
            ResumableCopyObjectInput input = new ResumableCopyObjectInput().setBucket(bucketName).setKey(objectKey)
                    .setSrcBucket(sourceBucketName).setSrcKey(sourceObjectKey).setTaskNum(taskNum).setPartSize(partSize)
                    .setEnableCheckpoint(enableCheckpoint).setCheckpointFile(checkpointFilePath).setCopyEventListener(listener);
            ResumableCopyObjectOutput output = tos.resumableCopyObject(input);
            System.out.println("resumableCopyObject succeed, object's etag is " + output.getEtag());
            System.out.println("resumableCopyObject succeed, object's crc64 is " + output.getHashCrc64ecma());
        } catch (TosClientException e) {
            // 操作失败,捕获客户端异常,一般情况是请求参数错误,此时请求并未发送
            System.out.println("resumableCopyObject failed");
            System.out.println("Message: " + e.getMessage());
            if (e.getCause() != null) {
                e.getCause().printStackTrace();
            }
        } catch (TosServerException e) {
            // 操作失败,捕获服务端异常,可以获取到从服务端返回的详细错误信息
            System.out.println("resumableCopyObject failed");
            System.out.println("StatusCode: " + e.getStatusCode());
            System.out.println("Code: " + e.getCode());
            System.out.println("Message: " + e.getMessage());
            System.out.println("RequestID: " + e.getRequestID());
        } catch (Throwable t) {
            // 作为兜底捕获其他异常,一般不会执行到这里
            System.out.println("resumableCopyObject failed");
            System.out.println("unexpected exception, message: " + t.getMessage());
        }
    }
}

取消机制

以下代码用于暂停或取消断点续传拷贝任务。

import com.volcengine.tos.TOSV2;
import com.volcengine.tos.TOSV2ClientBuilder;
import com.volcengine.tos.TosClientException;
import com.volcengine.tos.TosServerException;
import com.volcengine.tos.comm.event.CopyEventType;
import com.volcengine.tos.model.object.CopyEvent;
import com.volcengine.tos.model.object.CopyEventListener;
import com.volcengine.tos.model.object.ResumableCopyObjectInput;
import com.volcengine.tos.model.object.ResumableCopyObjectOutput;

public class ResumableCopyObjectWithCancelExample {
    public static void main(String[] args) {
        String endpoint = "your endpoint";
        String region = "your region";
        String accessKey = System.getenv("TOS_ACCESS_KEY");
        String secretKey = System.getenv("TOS_SECRET_KEY"); 

        String bucketName = "bucket-example";
        String objectKey = "example_dir/example_object.txt";

        String sourceBucketName = "source-bucket";
        String sourceObjectKey = "src_example_dir/src_example_object.txt";
        // taskNum 设置并发上传的并发数,范围为 1-1000
        int taskNum = 5;
        // partSize 设置文件分片大小,范围为 5MB - 5GB,默认为 20MB
        long partSize = 10 * 1024 * 1024;
        // enableCheckpoint 设置是否开启断点续传功能,开启则会在本地记录上传进度
        boolean enableCheckpoint = true;
        // checkpointFilePath 设置断点续传记录文件存放位置
        // 其格式为 {checkpointFilePath}+{bucket+objectKey+versionID 的 Base64Md5 值}.download
        String checkpointFilePath = "the checkpoint file path";

        TOSV2 tos = new TOSV2ClientBuilder().build(region, endpoint, accessKey, secretKey);

        try{
            ResumableCopyObjectInput input = new ResumableCopyObjectInput().setBucket(bucketName).setKey(objectKey)
                    .setSrcBucket(sourceBucketName).setSrcKey(sourceObjectKey).setTaskNum(taskNum).setPartSize(partSize)
                    .setEnableCheckpoint(enableCheckpoint).setCheckpointFile(checkpointFilePath);
            // 以下代码通过 UploadEventListener 监听上传事件。
            // 如果出现 UploadEventUploadPartFailed 事件,即有上传失败的分片时就终止上传任务。
            // 以下代码仅作为示例,用户可根据业务需要进行使用。
            boolean isAbort = true;
            CopyEventListener listener = new CopyEventListener() {
                @Override
                public void eventChange(CopyEvent uploadEvent) {
                    if (uploadEvent.getType() == CopyEventType.CopyEventUploadPartCopyFailed) {
                        System.out.println("event change, uploadPartCopy failed");
                        if (input.getCancelHook() != null) {
                            // 调用 cancel 时,如果 isAbort 为 true,会终止断点续传,删除本地 checkpoint 文件,
                            // 并调用 abortMultipartUpload 取消分片上传。
                            // 如果 isAbort 为 false,只会暂停当前上传任务,再次调用 uploadFile 可从断点处续传。
                            input.getCancelHook().cancel(isAbort);
                        }
                    }
                }
            };
            input.setCancelHook(true).setCopyEventListener(listener);
            ResumableCopyObjectOutput output = tos.resumableCopyObject(input);
            System.out.println("resumableCopyObject succeed, object's etag is " + output.getEtag());
            System.out.println("resumableCopyObject succeed, object's crc64 is " + output.getHashCrc64ecma());
        } catch (TosClientException e) {
            // 操作失败,捕获客户端异常,一般情况是请求参数错误,此时请求并未发送
            System.out.println("resumableCopyObject failed");
            System.out.println("Message: " + e.getMessage());
            if (e.getCause() != null) {
                e.getCause().printStackTrace();
            }
        } catch (TosServerException e) {
            // 操作失败,捕获服务端异常,可以获取到从服务端返回的详细错误信息
            System.out.println("resumableCopyObject failed");
            System.out.println("StatusCode: " + e.getStatusCode());
            System.out.println("Code: " + e.getCode());
            System.out.println("Message: " + e.getMessage());
            System.out.println("RequestID: " + e.getRequestID());
        } catch (Throwable t) {
            // 作为兜底捕获其他异常,一般不会执行到这里
            System.out.println("resumableCopyObject failed");
            System.out.println("unexpected exception, message: " + t.getMessage());
        }
    }
}