You need to enable JavaScript to run this app.
导航

Tunnel SDK

最近更新时间2023.09.28 16:49:43

首次发布时间2022.02.24 10:16:51

1. 批量数据通道 SDK

1.1 概述

LAS Tunnel 是 LAS 的数据通道,可以通过 Tunnel 向 LAS 中上传或者下载数据。
以下将介绍 Tunnel SDK 的批量数据通道接口,不同版本的 SDK 在使用上有所差别。

主要接口描述
TableTunnel访问 LAS Tunnel 服务的入口类。可以通过公网对 LAS 及其 Tunnel 进行访问。
BatchUploadSession表示一个向 LAS 表中批量上传数据的会话。
BatchDownloadSession表示一个从 LAS 表中批量下载数据的会话。
BatchRecordWriter表示一个批量上传数据会话中,用于上传特定数据块的接口。
BatchRecordReader表示一个批量下载数据会话中,用于下载特定数据块的接口。

如果使用 Maven,可以通过以下方式来获取不同版本的 Java SDK,相关配置信息如下。

<dependency>
  <groupId>com.bytedance.las</groupId>
  <artifactId>las-sdk-tunnel</artifactId>
  <version>1.2.0.18-public</version>
</dependency>

<repositories>
  <repository>
    <id>bytedance-public</id>
    <name>bytedance maven</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
  </repository>
</repositories>

1.2 TableTunnel

TableTunnel 是访问 LAS Tunnel 服务的入口类,支持表数据(非视图)的上传和下载。

1.2.1 TunnelConfig

构建 TableTunnel 时需要传入 TunnelConfig,如下表所示,建议全部使用默认值。

名称默认值备注
tunnel.write.chunk_size100 * 1024上传时,每个 chunk 的大小,单位 byte
tunnel.write.cache_size100上传时,client 缓存的未发送 chunk 数量
tunnel.read.cache_size100下载时,client 缓存的未读取 chunk 数量
tunnel.read.chunk_size100 * 1024下载时,每个 chunk 的大小,单位 byte
tunnel.transfer.data_timeout180RPC 请求超时时间(s)
tunnel.las.service_regioncn-beijingtunnel 集群所在 region,保持默认,不建议修改

1.2.2 TableTunnel 接口定义及说明

TableTunnel 与批量数据通道相关的接口定义如下

public class TableTunnel {
    public void setEndPoint(String endPoint);
    public BatchUploadSession createBatchUploadSession(String database, String table, PartitionSpec partition, ActionType action);
    public BatchUploadSession createBatchUploadSession(String database, String table, ActionType action);
    public BatchDownloadSession createBatchDownloadSession(String database, String table, PartitionSpec partition, List<String> columnList, Long blockSize);
    public BatchDownloadSession createBatchDownloadSession(String database, String table, List<String> columnList, Long blockSize);
}

TableTunnel 接口说明如下:

  • 生命周期:从 TableTunnel 实例被创建开始,一直到程序结束。

  • TableTunnel 提供创建批量上传下载 BatchUploadSession 对象和 BatchDownloadSession 对象的方法,对应着两个重载方法,分别用于分区表和非分区表

  • 对一张表或分区上传下载的过程,称为一个 Session

  • Endpoint 默认设置为 las-tunnel-cn-beijing.volces.com:80,建议不要修改

  • ActionType:INSERT 模式将在分区中插入数据;OVERWRITE_INSERT 模式则会覆盖之前已有数据

注意:LAS 主键表仅支持 UPSERT;LAS 非主键表或 Hive 表支持 INSERT 和 OVERWRITE_INSERT。

1.3 BatchUploadSession

本文介绍 BatchUploadSession 接口,此接口用于批量上传数据到数据表中。

1.3.1 BatchUploadSession 接口定义

public Schema getSchema();
public BatchRecordWriter openRecordWriter(long blockId);
public void commit(List<Long> expectedBlockIdList, List<String> expectedAttemptIdList);

1.3.2 BatchUploadSession 接口说明

  • 生命周期:从创建 BatchUploadSession 实例开始,一直到 session 提交,每个 session 在服务端的生命周期为 24 小时

  • 创建 BatchUploadSession 实例:可以通过 TableTunnel 进行创建 BatchUploadSession,Server 端会为该 session 生成唯一 session id 标识此 session

  • BatchUploadSession 共享:BatchUploadSession 含有唯一的 session id,可以通过序列化 BatchUploadSession 的方式共享 session。

  • 上传数据:调用 openRecordWriter 方法,生成 RecordWriter 实例,其中参数 blockId 用于标识此次上传的数据,取值范围为 [0,20000),当数据上传失败,可以根据 blockId 重新上传。

  • 提交:调用 commit 方法进行同步提交,参数 expectedBlockIdList 表示预期上传的 block 列表,expectedAttemptIdList 可以在上传成功的 BatchRecordWriter 对象上通过 getAttemptId 获得。Server 端会对该列表进行验证再提交。

1.4 BatchDownloadSession

本文介绍 BatchDownloadSession 接口,此接口用于批量下载表数据。

1.4.1 BatchDownloadSession 接口定义

public Schema getSchema();
public long getBlockCount();
public BatchRecordReader openRecordReader(Long blockId, BufferAllocator allocator);
public BatchRecordReader openRecordReader(Long start, Long count, BufferAllocator allocator);

1.4.2 BatchDownloadSession 接口说明

  • 生命周期:从创建 BatchDownloadSession 实例开始,一直到下载结束。

  • 创建 BatchDownloadSession 实例:可以通过 TableTunnel 进行同步创建。BatchDownloadSession,创建时需要指定期望切分每个 block 的大小。

  • BatchDownloadSession 共享:BatchDownloadSession 含有 block 数据, 可以通过序列化 BatchDownloadSession 的方式共享 session。

  • 下载数据:调用 openRecordReader 方法,同步生成 BatchRecordReader 实例,可以指定单个 block 或者多个连续 block。

1.5 BatchRecordWriter

本文介绍 BatchRecordWriter 接口,此接口用于批量上传单个 block,上传 Arrow 格式的数据。

1.5.1 BatchRecordWriter 接口定义

public String getAttemptId();
public void write(VectorSchemaRoot data);
public void close();

1.5.2 BatchRecordWriter 接口说明

  • 生命周期:从创建 BatchRecordWriter 实例开始,一直到调用 close 方法。

  • 上传数据:调用 write 方法,Arrow 数据会暂时写入本地缓存,达到一定大小才会触发网络传输,具体参数设置参见 TunnelConfig#tunnel.write.chunk_size 和 tunnel.write.cache_size。

  • 超时:如果 180 秒内没有网络动作,服务端将主动关闭连接,此时 Writer 将不可用,请重新打开一个新的 Writer 写入。

  • 结束上传:调用 close 方法,等待所有本地缓存上传完毕后关闭连接。单个 block 上传的大小限制为 100 GB。

1.6 BatchRecordReader

本文介绍 BatchRecordReader 接口,此接口用于批量下载 block,读取 Arrow 格式的数据。

1.6.1 BatchRecordReader 接口定义

public VectorSchemaRoot read();
public void close();

1.6.2 BatchRecordReader 接口说明

  • 生命周期:从创建 BatchRecordReader 实例开始,一直到调用 close 方法。

  • 下载数据:从创建 BatchRecordReader 实例开始,reader 便不断下载数据至本地缓冲区,调用 read 方法将从本地缓冲区读取 Arrow 数据,具体参数设置参见 TunnelConfig#tunnel.read.chunk_size 和 tunnel.read.cache_size。

  • 超时:如果 180 秒内没有网络动作,服务端将主动关闭连接,此时 Reader 将不可用,请重新打开一个新的 Reader 写入。

  • 结束上传:调用 close 方法将立即结束当前的下载线程。

2. 批量数据通道 SDK 示例

2.1 概述

2.2 上传示例

Tunnel SDK 是 LAS 提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载。

2.2.1 典型的表数据上传流程

  • 创建 TableTunnel

  • 创建 BatchUploadSession

  • 创建 BatchRecordWriter,写入 Arrow 格式数据

  • 提交上传

2.2.2 示例

import static com.bytedance.las.tunnel.ActionType.INSERT;
import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;

import com.bytedance.las.tunnel.TableTunnel;
import com.bytedance.las.tunnel.TunnelConfig;
import com.bytedance.las.tunnel.authentication.Account;
import com.bytedance.las.tunnel.authentication.AkSkAccount;
import com.bytedance.las.tunnel.data.PartitionSpec;
import com.bytedance.las.tunnel.session.BatchRecordWriter;
import com.bytedance.las.tunnel.session.BatchUploadSession;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

public class UploadExample {

  private static String accessId  = "acessId" ;
  private static String accessKey  =  "accessKey" ;
  // 如果不指定 endpoint 将默认使用 las-tunnel-cn-beijing.volces.com:80
  private static String tunnelEndpoint  =  "las-tunnel-cn-beijing.volces.com:80" ;
  // 如果不指定 region 将默认使用 cn-beijing
  private static String tunnelRegion  =  "cn-beijing" ;
  private static String database  =  "db" ;
  private static String table  =  "tbl" ;
  private static String partition  =  "pt='1',ds='2'" ;
  private static List<String> cols  = Arrays.asList( "a" ,  "b" ,  "c" );

  public static void main(String[] args) throws Exception {
    // 构建 TableTunnel
    TunnelConfig config = new TunnelConfig.Builder()
        .config(SERVICE_REGION, tunnelRegion)
        .build();
    Account account = new AkSkAccount(accessId, accessKey);
    TableTunnel tableTunnel = new TableTunnel(account, config);
    tableTunnel.setEndPoint(tunnelEndpoint);

    // 构建 BatchUploadSession, 创建一个有效期为 24 小时的 session
    // 创建 session 耗时在秒级且消耗较多资源, 建议统一分区的数据尽可能复用 session 上传
    BatchUploadSession uploadSession = tableTunnel.createBatchUploadSession(
        dataBase,
        table,
        new PartitionSpec(partition),
        // INSERT  模式将在分区中插入数据;OVERWRITE_INSERT 模式则会覆盖之前已有数据
        /*action type*/INSERT);
    Schema schema = uploadSession.getSchema();
    try (BufferAllocator allocator = new RootAllocator()) {
      // 打开 BatchRecordWriter, blockId 取值范围为 [0, 20000), 每个 block 最大 100g
      // 可以对相同 blockId 打开多个 BatchRecordWriter, 这些 block 会以不同 attemptId 存在
      // BatchRecordWriter close 成功表示该 block 成功写入
      BatchRecordWriter recordWriter = uploadSession.openRecordWriter(/*blockId*/0);
      // 构造 Arrow 数据
      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
        IntVector intVector = (IntVector) root.getVector( "a" );
        intVector.allocateNew(10);
        for (int i = 0; i < 10; i++) {
          intVector.set(i, i);
        }
        intVector.setValueCount(10);
        root.setRowCount(10);
        // 客户端默认写入 1024 行数据进行一次网络传输
        recordWriter.write(root);
      }
      // 等待直到本地缓存数据都发送完成
      recordWriter.close();

      // 检查期望 blockId+attemptId 并完成上传提交
      uploadSession.commit(/*expectedBlockIdList*/Collections.singletonList(0L),
          /*expectedAttemptIdList*/Collections.singletonList(recordWriter.getAttemptId()));
    }
  }
}

2.3 下载示例

Tunnel SDK 是 LAS 提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载。

2.3.1 典型的表数据下载流程

  • 创建 TableTunnel

  • 创建 BatchDownloadSession

  • 创建 BatchRecordReader,读取 Arrow 格式数据

2.3.2 示例

import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;

import com.bytedance.las.tunnel.TableTunnel;
import com.bytedance.las.tunnel.TunnelConfig;
import com.bytedance.las.tunnel.authentication.Account;
import com.bytedance.las.tunnel.authentication.AkSkAccount;
import com.bytedance.las.tunnel.data.PartitionSpec;
import com.bytedance.las.tunnel.session.BatchDownloadSession;
import com.bytedance.las.tunnel.session.BatchRecordReader;
import java.util.Arrays;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;

public class DownloadExample {

  private static String accessId  = "acessId" ;
  private static String accessKey  =  "accessKey" ;
  // 如果不指定 endpoint 将默认使用 las-tunnel-cn-beijing.volces.com:80
  private static String tunnelEndpoint  =  "las-tunnel-cn-beijing.volces.com:80" ;
  // 如果不指定 region 将默认使用 cn-beijing
  private static String tunnelRegion  =  "cn-beijing" ;
  private static String database  =  "db" ;
  private static String table  =  "tbl" ;
  private static String partition  =  "pt='1',ds='2'" ;
  private static List<String> cols  = Arrays.asList( "a" ,  "b" ,  "c" );

  public static void main(String[] args) throws Exception {
    // 构建 TableTunnel
    TunnelConfig config = new TunnelConfig.Builder()
        .config(SERVICE_REGION, tunnelRegion)
        .build();
    Account account = new AkSkAccount(accessId, accessKey);
    TableTunnel tableTunnel = new TableTunnel(account, config);
    tableTunnel.setEndPoint(tunnelEndpoint);

    // 构建 BatchDownloadSession, 从 server 获取切分 block 信息
    BatchDownloadSession downloadSession = tableTunnel
        .createBatchDownloadSession(
            dataBase,
            table,
            new PartitionSpec(partition),
            cols,
            /*block size*/102400L);
    long blockCount = downloadSession.getBlockCount();

    try (BufferAllocator allocator = new RootAllocator()) {
      for (long i = 0; i < blockCount; i++) {
        // 构建 BatchRecordReader 用于下载 block 数据
        BatchRecordReader reader = downloadSession.openRecordReader(/*block id*/i, allocator);
        VectorSchemaRoot root;
        while ((root = reader.read()) != null) {
          try {
            System.out.println(root.contentToTSVString());
          } finally {
            root.close();
          }
        }
        reader.close();
      }
    }
  }
}
3. 流式数据通道 SDK 介绍

3.1 概述

流式数据通道服务提供了以流式的方式把数据写入 LAS 的能力,使用与原批量数据通道服务类似的 API,解决了批量数据通道在高并发、高 QPS 场景下的性能瓶颈。本文介绍如何使用流式数据通道服务。

主要接口描述
TableTunnel访问 LAS Tunnel 服务的入口类,与批量数据通过的入口相同。可以通过公网对 LAS 及其 Tunnel 进行访问。
StreamUploadSession表示一个向 LAS 表中流式上传数据的会话。
StreamRecordWriter表示一个流式上传数据会话中,用于上传特定数据块的接口。

如果使用 Maven,可以通过以下方式来获取不同版本的 Java SDK,相关配置信息如下。

<dependency>
  <groupId>com.bytedance.las</groupId>
  <artifactId>las-sdk-tunnel</artifactId>
  <version>1.2.0.18-public</version>
</dependency>

3.2 TableTunnel

TableTunnel 是访问 LAS Tunnel 服务的入口类,支持表数据(非视图)的批量上传、流式上传和批量下载。

3.2.1 TunnelConfig

构建 TableTunnel 时需要传入 TunnelConfig,建议全部使用默认值,定义与批量数据通道相同,见 1.2.1。

3.2.2 TableTunnel 接口定义及说明

TableTunnel 与批量数据通道相关的接口定义如下:

public class TableTunnel {
    public void setEndPoint(String endPoint);
    public StreamUploadSession createStreamUploadSession(String database, String table, PartitionSpec partition, ActionType action)
    public StreamUploadSession createStreamUploadSession(String database, String table, ActionType action)
    public StreamUploadSession createStreamUploadSession(String database, String table, PartitionSpec partition, ActionType action, String sessionId)
    public StreamUploadSession createStreamUploadSession(String database, String table, ActionType action, String sessionId)
}

TableTunnel 接口说明如下:

  • 生命周期:从 TableTunnel 实例被创建开始,一直到程序结束。

  • TableTunnel 提供创建流式上传 StreamUploadSession 对象的方法,以及根据 sessionId 恢复 StreamUploadSession 对象的方法

  • 对一张表或分区流式上传的过程,称为一个 Stream Session

  • Endpoint 默认设置为 las-tunnel-cn-beijing.volces.com:80,建议不要修改

  • ActionType:INSERT 模式将在分区中插入数据;OVERWRITE_INSERT 模式则会覆盖之前已有数据

注意:LAS 主键表仅支持 UPSERT;LAS 非主键表或 Hive 表支持 INSERT 和 OVERWRITE_INSERT。

3.3 StreamUploadSession

本文介绍 StreamUploadSession 接口,此接口用于流式上传数据到数据表中。

3.3.1 StreamUploadSession 接口定义

// 不带有分区列的 schema
public Schema getSchema();
// 带有分区列在末尾的 schema
public Schema getFullSchema();
public StreamRecordWriter openRecordWriter(long blockId);
public void commit(List<Long> expectedBlockIdList, List<String> expectedAttemptIdList);

3.3.2 StreamUploadSession 接口说明

  • 生命周期:从创建 StreamUploadSession 实例开始,每个 session 在服务端的生命周期为 48 小时

  • 创建 StreamUploadSession 实例:可以通过 TableTunnel 进行创建 StreamUploadSession,Server 端会为该 session 生成唯一 session id 标识此 session

  • StreamUploadSession 恢复:StreamUploadSession 含有唯一的 session id,可以通过TableTunnel 中的接口恢复 StreamUploadSession。

  • 上传数据:调用 openRecordWriter 方法,生成 RecordWriter 实例,其中参数 blockId 用于标识此次上传的数据,取值范围为 [0,20000),当数据上传失败,可以根据 blockId 重新上传。

  • 多次提交:调用 commit 方法进行同步提交,参数 expectedBlockIdList 表示预期上传的 block 列表,expectedAttemptIdList 可以在上传成功的 StreamRecordWriter 对象上通过 getAttemptId 获得。Server 端会对该列表进行验证再提交,提交成功后数据立即可见。

  • 多分区写入:指定分区为 PartitionSpec.SELF_ADAPTER 时,StreamUploadSession 将支持多分区写入。上传数据时需要在末尾附带分区字段,则 tunnel 会根据分区字段进行数据写入。

3.5 StreamRecordWriter

本文介绍 StreamRecordWriter 接口,此接口用于流式上传单个 block,上传 Avro 格式的数据。

3.5.1 StreamRecordWriter 接口定义

public String getAttemptId();
public void write(IndexedRecord data);
public void close();

3.5.2 StreamRecordWriter 接口说明

  • 生命周期:从创建 StreamRecordWriter 实例开始,一直到调用 close 方法。

  • 上传数据:调用 write 方法,Avro 数据会暂时写入本地缓存,达到一定大小才会触发网络传输,具体参数设置参见 TunnelConfig#tunnel.write.chunk_size 和 tunnel.write.cache_size。

  • 超时:如果 180 秒内没有网络动作,服务端将主动关闭连接,此时 Writer 将不可用,请重新打开一个新的 Writer 写入。

  • 结束上传:调用 close 方法,等待所有本地缓存上传完毕后关闭连接。单个 block 上传的大小限制为 100 GB。

4. 流式数据通道 SDK 示例

4.1 上传示例

Tunnel SDK 是 LAS 提供的流式数据通道服务,相比于批式数据通道,支持 StreamUploadSession 恢复,多次提交以及多分区写入。

4.1.1 典型的表数据上传流程示例

package com.bytedance.las.tunnel.example;

import static com.bytedance.las.tunnel.ActionType.INSERT;
import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;

import com.bytedance.las.tunnel.TableTunnel;
import com.bytedance.las.tunnel.TunnelConfig;
import com.bytedance.las.tunnel.authentication.Account;
import com.bytedance.las.tunnel.authentication.AkSkAccount;
import com.bytedance.las.tunnel.data.PartitionSpec;
import com.bytedance.las.tunnel.session.StreamRecordWriter;
import com.bytedance.las.tunnel.session.StreamUploadSession;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;

public class StreamUploadExample {

  private static final String accessId  = "acessId" ;
  private static final String accessKey  =  "accessKey" ;
  // 如果不指定 endpoint 将默认使用 las-tunnel-cn-beijing.volces.com:80
  private static final String tunnelEndpoint  =  "las-tunnel-cn-beijing.volces.com:80" ;
  // 如果不指定 region 将默认使用 cn-beijing
  private static final String tunnelRegion  =  "cn-beijing" ;
  private static final String database  =  "db" ;
  private static final String table  =  "tbl" ;
  private static final String partition  =  "pt='1',ds='2'" ;

  public static void main(String[] args) throws Exception {
    // 构建 TableTunnel
    TunnelConfig config = new TunnelConfig.Builder()
        .config(SERVICE_REGION, tunnelRegion)
        .build();
    Account account = new AkSkAccount(accessId, accessKey);
    TableTunnel tableTunnel = new TableTunnel(account, config);
    tableTunnel.setEndPoint(tunnelEndpoint);
    // 构建 StreamUploadSession, 创建一个有效期为 48 小时的 session
    StreamUploadSession uploadSession = tableTunnel.createStreamUploadSession(
        dataBase,
        table,
        new PartitionSpec(partition),
        /*action type*/INSERT);
    Schema schema = uploadSession.getSchema();
    // 打开 StreamRecordWriter, blockId 取值范围为 [0, 19999], 每个 block 最大 100g
    // 可以对相同 blockId 打开多个 StreamRecordWriter, 这些 block 会以不同 attemptId 存在
    // StreamRecordWriter close 成功表示该 block 成功写入
    StreamRecordWriter recordWriter = uploadSession.openRecordWriter(/*blockId*/0);
    // 构造 Avro 数据并且写入本地缓存进行异步发送
    Record record = new Record(schema);
    record.put( "e1" , 0);
    recordWriter.write(record);
    // 等待直到本地缓存数据都发送完成
    recordWriter.close();

    // 第一次提交
    // 检查期望 blockId+attemptId 并完成上传提交, 提交完成数据立即可见
    uploadSession.commit(/*expectedBlockIdList*/Collections.singletonList(0L),
        /*expectedAttemptIdList*/Collections.singletonList(recordWriter.getAttemptId()));

    recordWriter = uploadSession.openRecordWriter(/*blockId*/0);
    record = new Record(schema);
    record.put( "e1" , 0);
    recordWriter.write(record);
    recordWriter.close();

    // 第二次提交, 提交完成数据立即可见
    uploadSession.commit(/*expectedBlockIdList*/Collections.singletonList(0L),
        /*expectedAttemptIdList*/Collections.singletonList(recordWriter.getAttemptId()));
  }

4.1.2 多分区上传流程示例

package com.bytedance.las.tunnel.example;

import static com.bytedance.las.tunnel.ActionType.INSERT;
import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;

import com.bytedance.las.tunnel.TableTunnel;
import com.bytedance.las.tunnel.TunnelConfig;
import com.bytedance.las.tunnel.authentication.Account;
import com.bytedance.las.tunnel.authentication.AkSkAccount;
import com.bytedance.las.tunnel.data.PartitionSpec;
import com.bytedance.las.tunnel.session.StreamRecordWriter;
import com.bytedance.las.tunnel.session.StreamUploadSession;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;

public class StreamUploadExample {

  private static final String accessId  = "acessId" ;
  private static final String accessKey  =  "accessKey" ;
  // 如果不指定 endpoint 将默认使用 las-tunnel-cn-beijing.volces.com:80
  private static final String tunnelEndpoint  =  "las-tunnel-cn-beijing.volces.com:80" ;
  // 如果不指定 region 将默认使用 cn-beijing
  private static final String tunnelRegion  =  "cn-beijing" ;
  private static final String database  =  "db" ;
  private static final String table  =  "tbl" ;

  public static void main(String[] args) throws Exception {
    // 构建 TableTunnel
    TunnelConfig config = new TunnelConfig.Builder()
        .config(SERVICE_REGION, tunnelRegion)
        .build();
    Account account = new AkSkAccount(accessId, accessKey);
    TableTunnel tableTunnel = new TableTunnel(account, config);
    tableTunnel.setEndPoint(tunnelEndpoint);
    StreamUploadSession uploadSession = tableTunnel.createStreamUploadSession(
        dataBase,
        table,
        // 指定为自适应分区
        PartitionSpec.SELF_ADAPTER,
        /*action type*/INSERT);
    StreamRecordWriter recordWriter = uploadSession.openRecordWriter(/*blockId*/0);
    // Record 附带分区列, 分区字段在最后几列
    Schema schema = uploadSession.getFullSchema();
    Record record1 = new Record(schema);
    record1.put( "e1" , 0);
    // 指定分区列
    record1.put( "pt" ,  "3" );
    record1.put( "ds" ,  "4" );
    recordWriter.write(record1);
    Record record2 = new Record(schema);
    record2.put( "e1" , 0);
    // 指定分区列
    record2.put( "pt" ,  "2" );
    record2.put( "ds" ,  "3" );
    recordWriter.write(record2);
    recordWriter.close();
    uploadSession.commit(/*expectedBlockIdList*/Collections.singletonList(0L),
        /*expectedAttemptIdList*/Collections.singletonList(recordWriter.getAttemptId()));
  }

4.1.3 StreamUploadSession 恢复流程示例

package com.bytedance.las.tunnel.example;

import static com.bytedance.las.tunnel.ActionType.INSERT;
import static com.bytedance.las.tunnel.TunnelConfig.SERVICE_REGION;

import com.bytedance.las.tunnel.TableTunnel;
import com.bytedance.las.tunnel.TunnelConfig;
import com.bytedance.las.tunnel.authentication.Account;
import com.bytedance.las.tunnel.authentication.AkSkAccount;
import com.bytedance.las.tunnel.data.PartitionSpec;
import com.bytedance.las.tunnel.session.StreamRecordWriter;
import com.bytedance.las.tunnel.session.StreamUploadSession;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;

public class StreamUploadExample {

  private static final String accessId  = "acessId" ;
  private static final String accessKey  =  "accessKey" ;
  // 如果不指定 endpoint 将默认使用 las-tunnel-cn-beijing.volces.com:80
  private static final String tunnelEndpoint  =  "las-tunnel-cn-beijing.volces.com:80" ;
  // 如果不指定 region 将默认使用 cn-beijing
  private static final String tunnelRegion  =  "cn-beijing" ;
  private static final String database  =  "db" ;
  private static final String table  =  "tbl" ;
  private static final String partition  =  "pt='1',ds='2'" ;

  public static void main(String[] args) throws Exception {
    // 构建 TableTunnel
    TunnelConfig config = new TunnelConfig.Builder()
        .config(SERVICE_REGION, tunnelRegion)
        .build();
    Account account = new AkSkAccount(accessId, accessKey);
    TableTunnel tableTunnel = new TableTunnel(account, config);
    tableTunnel.setEndPoint(tunnelEndpoint);
    StreamUploadSession uploadSession = tableTunnel.createStreamUploadSession(
        dataBase,
        table,
        new PartitionSpec(partition),
        /*action type*/INSERT);
    // 通过 session id 实现 session 恢复
    String sessionId = uploadSession.getSessionId();
    // ... write some data
    StreamRecordWriter recordWriter = uploadSession.openRecordWriter(/*blockId*/0);
    Record record = new Record(uploadSession.getSchema());
    record.put( "e1" , 0);
    recordWriter.write(record);
    recordWriter.close();
    // session 提交时出现异常, 通过 session id 构建一个新的 session
    StreamUploadSession newUploadSession = tableTunnel.createStreamUploadSession(
        dataBase,
        table,
        new PartitionSpec(partition),
        /*action type*/INSERT,
        sessionId);
    // 进行未完成的提交
    newUploadSession.commit(/*expectedBlockIdList*/Collections.singletonList(0L),
        /*expectedAttemptIdList*/Collections.singletonList(recordWriter.getAttemptId()));
  }
5. 注意事项
  • 对于 Tunnel SDK 中的“短连接”接口,Tunnel SDK 中有失败重试机制(如:createUploadSession、commit、getSchema、getBlockMetadataList 等接口),注意:默认重试 3 次,每次间隔 3秒。
  • 对于 Tunnel SDK 中的“长连接”接口, Tunnel SDK 中没有失败重试机制,需要在业务上做好 Failover(如:read、write、close 等接口)。