You need to enable JavaScript to run this app.
导航
Flink Connector Driver
最近更新时间:2025.09.05 15:58:41首次发布时间:2024.06.14 16:36:54
复制全文
我的收藏
有用
有用
无用
无用

Flink Connector Driver for ByteHouse 云数仓版连接器专门用于通过 Flink 将数据加载到 ByteHouse云数仓版。本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。

背景信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 Flink Connector Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

准备工作

根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。

Flink 版本

驱动程序

发布日期

1.18

v2.12-1.27.117-1.18

2025-01-13

1.17

v2.12-1.27.117-1.17

2025-01-13

1.16

v2.12-1.27.117-1.16

2025-01-13

1.15

v2.12-1.27.117-1.15

2025-01-13

1.11
(scala 2.11)

flink-sql-connector-bytehouse-cdw_2.11-1.27.104-1.11.jar
未知大小

2024-10-23

本地安装

请运行以下命令,将下载的flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar文件安装到本地 Maven 存储库:

mvn install:install-file \
    -Dfile=${your_path}/flink-sql-connector-bytehouse-cdw-${flink-sql-connector-bytehouse-cdw.version}.jar \
    -DgroupId=com.bytedance.bytehouse \
    -DartifactId=flink-sql-connector-bytehouse-cdw \
    -Dversion=${flink-sql-connector-bytehouse-cdw.version} \
    -Dpackaging=jar

然后,您可以使用 IDE(例如 IntelliJ)将其添加为项目中的依赖项。

Maven 依赖

对于要使用 Flink connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。scala.version 需要是 2.11 或 2.12,与 Flink 发行版的关联 Scala 版本相对应。${flink-sql-connector-bytehouse-cdw.version} 可以设置为所需的 Flink connector 连接器版本。

<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>flink-sql-connector-bytehouse-cdw_${scala.version}</artifactId>
    <version>${flink-sql-connector-bytehouse-cdw.version}</version>
</dependency>

<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>flink-sql-connector-bytehouse-cdw_2.12</artifactId>
    <version>1.27.100_snapshot7</version>
</dependency>

然后,将以下存储库添加到 pom.xml文件:

<repository>
    <id>bytedance</id>
    <name>ByteDance Public Repository</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
</repository>

使用示例

下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 云数仓版数据表中的示例。

使用 IAM 用户连接并导入

您可参考步骤三:获取 ByteHouse 连接串信息中使用 IAM 用户连接所需的信息,替换下面对应的占位符。

  • 'bytehouse.gateway.host':配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息
  • 'bytehouse.gateway.api-token':设置为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息 中新建并复制 API Key。详情请参见获取 API Key
  • 'bytehouse.gateway.virtual-warehouse':设置为 ByteHouse 计算组 ID,您可以在 ByteHouse 控制台的计算组 页面获取。
  • 'database':您需要连接的数据库名称。
  • 'table-name':您需要连接的表名称。

更多参数说明请参见参数说明

CREATE TABLE `bh_de_source` ( -- 源表
 `id` BIGINT NOT NULL,
 `time` TIMESTAMP(0),
 `content` ARRAY<DECIMAL(20, 0)>
) WITH (
 'connector' = 'datagen'
);
CREATE TABLE `bh_de_sink` ( -- 目标表
 `id` STRING NOT NULL,
 `event_time` STRING,
 `content` ARRAY<DECIMAL(20, 0)>,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
 'connector' = 'bytehouse-cdw',
 'jdbc.enable-gateway-connection' = 'true',
 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
 'bytehouse.gateway.host' = 'tenant-{TENANT-ID}-{REGION}.bytehouse.ivolces.com',
 'bytehouse.gateway.port' = '19000',
 'bytehouse.gateway.api-token' = '<API_KEY>',
 'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
 'database' = '<BYTEHOUSE_DB>',
 'table-name' = '<BYTEHOUSE_TABLE>'
);
INSERT INTO `bh_de_sink` ( -- 数据加载
 `id`,
 `event_time`,
 `content`
)
SELECT
 CAST(`id` AS STRING), -- 带有默认值的类型转换
 CAST(`time` AS STRING), -- 类型转换
 `content` -- 默认值
FROM `bh_de_source`;

使用数据库用户连接并导入

您可参考步骤三:获取 ByteHouse 连接串信息中使用数据库用户连接所需的信息,替换下面对应的占位符。更多参数说明请参见参数说明

  • 'bytehouse.gateway.host':配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息
  • 'bytehouse.gateway.virtual-warehouse':设置为 ByteHouse 计算组 ID,您可以在 ByteHouse 控制台的计算组 页面获取。
  • 'bytehouse.gateway.account':设置为火山引擎用户账号 ID 或名称,您可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
  • 'bytehouse.gateway.username':设置为 ByteHouse 数据库账号用户名,您可登录 ByteHouse 控制台,在权限管理 > 用户页面中查看并复制数据库用户名。
  • 'bytehouse.gateway.paasword':设置为 ByteHouse 数据库用户的密码,该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。
  • 'database':您需要连接的数据库名称。
  • 'table-name':您需要连接的表名称。
CREATE TABLE `bh_de_source` ( -- 源表
 `id` BIGINT NOT NULL,
 `time` TIMESTAMP(0),
 `content` ARRAY<DECIMAL(20, 0)>
) WITH (
 'connector' = 'datagen'
);
CREATE TABLE `bh_de_sink` ( -- 目标表
 `id` STRING NOT NULL,
 `event_time` STRING,
 `content` ARRAY<DECIMAL(20, 0)>,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
 'connector' = 'bytehouse-cdw',
 'jdbc.enable-gateway-connection' = 'true',
 'bytehouse.gateway.region' = 'VOLCANO_PRIVATE',
 'bytehouse.gateway.host' = 'tenant-{TENANT-ID}-{REGION}.bytehouse.ivolces.com',
 'bytehouse.gateway.port' = '19000',
 'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
 'bytehouse.gateway.account' = '<your_volcano_engine_ID>',
 'username' = '<your_bytehouse_cdw_database_user>',
 'paasword' = '<your_bytehouse_cdw_database_user_password>',
 'database' = '<BYTEHOUSE_DB>',
 'table-name' = '<BYTEHOUSE_TABLE>'
);
INSERT INTO `bh_de_sink` ( -- 数据加载
 `id`,
 `event_time`,
 `content`
)
SELECT
 CAST(`id` AS STRING), -- 带有默认值的类型转换
 CAST(`time` AS STRING), -- 类型转换
 `content` -- 默认值
FROM `bh_de_source`;

Flink 连接器的 DataStream API 将源数据类型限制为 RowData
DataStream API 的使用主要是通过 CnchSinkFunctionBuilder 获取 CnchSinkFunction 实例,这有助于连接器的各种配置。
示例:多表数据库用户写入

使用 IAM 用户连接并导入

您可参考步骤三:获取 ByteHouse 连接串信息中使用 IAM 用户连接所需的信息,替换下面对应的占位符。更多参数说明请参见参数说明

  • .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
    • "your_host":配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息
    • "your_port"):固定为 19000。
  • .withGatewayApiToken("<< your API token >>"):设置为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息 中获取API Key。详情请参见获取 API Key
  • .withGatewayVirtualWarehouse("default_vw"):设置为 ByteHouse 计算组 ID,您可以在 ByteHouse 控制台的计算组 页面获取。
import bytehouse.shaded.com.google.common.collect.Lists;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.CnchMultiSinkFunction;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchMultiSinkFunctionBuilder;
import com.bytedance.bytehouse.flink.connector.cnch.sql.ByteHouseCdwCreateTableStatementBuilder;
import com.bytedance.bytehouse.table.ByteHouseColumn;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class MultiSinkCdwTableExample {
  public static void main(String[] args) throws Exception {
    final String region = "VOLCANO_PRIVATE";
    final String host = args[0]; // e.g., tenant-{accountId}-{region}-public.bytehouse.volces.com
    final int port = 19000;
    final String token = args[1]; // e.g., 1HkKORvYGV:xxxxxxxx
    final String vwId = args[2]; // e.g., vw-{environment_id}{account_id}-{virtual_warehouse_name}
    final String dbName = args[3];
    final String tableName = args[4];
    List<RowData> collectedRowData = Lists.newArrayList();
    ByteHouseCdwClient bhCli = new ByteHouseCdwClient(region, host, port, token, vwId);
    // Drop and recreate tables
    final String tableName1 = tableName + "1";
    bhCli.dropTableIfExists(dbName, tableName1);
    bhCli.execute(
        new ByteHouseCdwCreateTableStatementBuilder()
            .createTable(dbName, tableName1)
            .schema("id Int64, event_time DateTime, content String")
            .partitionBy("toYYYYMMDD(`event_time`)")
            .orderBy("id")
            .uniqueKey("id")
            .build());
    final String tableName2 = tableName + "2";
    bhCli.dropTableIfExists(dbName, tableName2);
    bhCli.execute(
        new ByteHouseCdwCreateTableStatementBuilder()
            .createTable(dbName, tableName2)
            .schema("id Int64, properties Map(String, String)")
            .addColumn(ByteHouseColumn.of("login_time", "Nullable(Time(3))"))
            .build());
    // Construct data
    for (int i = 0; i < 10; i++) {
      Map<String, Object> record = new HashMap<>();
      record.put("id", i);
      record.put("event_time", LocalDateTime.now());
      record.put("login_time", LocalTime.now());
      record.put("content", String.format("Row %d @ %s", i, record.get("event_time")));
      Map<String, String> props = new HashMap<>();
      props.put("propId", record.get("id").toString());
      props.put("propEventTime", record.get("event_time").toString());
      props.put("propContent", record.get("content").toString());
      record.put("properties", props);
      RowData rowData1 = bhCli.constructRichRowData(RowKind.INSERT, dbName, tableName1, record);
      collectedRowData.add(rowData1);
      RowData rowData2 = bhCli.constructRichRowData(RowKind.INSERT, dbName, tableName2, record);
      collectedRowData.add(rowData2);
    }
    // Create the execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // Add a source to the data stream
    DataStream<RowData> dataStream = env.fromCollection(collectedRowData);
    try (CnchMultiSinkFunction<RowData> sink =
        new CnchMultiSinkFunctionBuilder.MultiRowDataTable(Duration.ofMinutes(3))
            .withGatewayConnection(region, host, port)
            .withGatewayApiToken(token)
            .withGatewayVirtualWarehouse(vwId)
            .withFlushInterval(Duration.ofSeconds(10))
            .withFlushBatchSize(5000)
            .build()) {
      // Add a sink to the data stream
      dataStream.addSink(sink);
      // Trigger the execution
      env.execute(MultiSinkCdwTableExample.class.getSimpleName());
    }
    // Select and print out all rows in the table
    System.out.printf("\n==== %s ====\n", tableName1);
    bhCli.selectAll(dbName, tableName1).forEach(System.out::println);
    System.out.printf("\n==== %s ====\n", tableName2);
    bhCli.selectAll(dbName, tableName2).forEach(System.out::println);
    // Clean up
    bhCli.close();
  }
}

使用数据库用户连接并导入

您可参考步骤三:获取 ByteHouse 连接串信息中使用数据库用户连接所需的信息,替换下面对应的占位符。更多参数说明请参见参数说明

  • .withGatewayConnection("VOLCANO_PRIVATE", "your_host", "your_port")
    • "your_host":配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息
    • "your_port"):固定为 19000。
  • .withGatewayAccount(accountId, username, password)
    • accountId:设置为火山引擎用户账号 ID 或名称,您可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • username:设置为 ByteHouse 数据库账号用户名,您可登录 ByteHouse 控制台,在权限管理 > 用户页面中查看并复制数据库用户名。
    • password:设置为 ByteHouse 数据库用户的密码,该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。
  • .withGatewayVirtualWarehouse("default_vw"):设置为 ByteHouse 计算组 ID,您可以在 ByteHouse 控制台的计算组 页面获取。
import bytehouse.shaded.com.google.common.collect.Lists;
import com.bytedance.bytehouse.flink.connector.cnch.ByteHouseCdwClient;
import com.bytedance.bytehouse.flink.connector.cnch.CnchMultiSinkFunction;
import com.bytedance.bytehouse.flink.connector.cnch.api.java.CnchMultiSinkFunctionBuilder;
import com.bytedance.bytehouse.flink.connector.cnch.sql.ByteHouseCdwCreateTableStatementBuilder;
import com.bytedance.bytehouse.table.ByteHouseColumn;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class MultiSinkCdwTableExample {
  public static void main(String[] args) throws Exception {
    final String region = "VOLCANO_PRIVATE";
    final String host = args[0]; // e.g., tenant-2100000000-cn-guangzhou-public.bytehouse.volces.com
    final int port = 19000;
    final String accountId = args[1];
    final String username = args[2];
    final String password = args[3];
    final String vwId = args[4]; // e.g., vw-2100000000-default-vw
    final String dbName = args[5];
    final String tableName = args[6];
    List<RowData> collectedRowData = Lists.newArrayList();
    ByteHouseCdwClient bhCli = new ByteHouseCdwClient(region, host, port, accountId, username, password, vwId);
    // Drop and recreate tables
    final String tableName1 = tableName + "1";
    bhCli.dropTableIfExists(dbName, tableName1);
    bhCli.execute(
        new ByteHouseCdwCreateTableStatementBuilder()
            .createTable(dbName, tableName1)
            .schema("id Int64, event_time DateTime, content String")
            .partitionBy("toYYYYMMDD(`event_time`)")
            .orderBy("id")
            .uniqueKey("id")
            .build());
    final String tableName2 = tableName + "2";
    bhCli.dropTableIfExists(dbName, tableName2);
    bhCli.execute(
        new ByteHouseCdwCreateTableStatementBuilder()
            .createTable(dbName, tableName2)
            .schema("id Int64, properties Map(String, String)")
            .addColumn(ByteHouseColumn.of("login_time", "Nullable(Time(3))"))
            .build());
    // Construct data
    for (int i = 0; i < 10; i++) {
      Map<String, Object> record = new HashMap<>();
      record.put("id", i);
      record.put("event_time", LocalDateTime.now());
      record.put("login_time", LocalTime.now());
      record.put("content", String.format("Row %d @ %s", i, record.get("event_time")));
      Map<String, String> props = new HashMap<>();
      props.put("propId", record.get("id").toString());
      props.put("propEventTime", record.get("event_time").toString());
      props.put("propContent", record.get("content").toString());
      record.put("properties", props);
      RowData rowData1 = bhCli.constructRichRowData(RowKind.INSERT, dbName, tableName1, record);
      collectedRowData.add(rowData1);
      RowData rowData2 = bhCli.constructRichRowData(RowKind.INSERT, dbName, tableName2, record);
      collectedRowData.add(rowData2);
    }
    // Create the execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // Add a source to the data stream
    DataStream<RowData> dataStream = env.fromCollection(collectedRowData);
    try (CnchMultiSinkFunction<RowData> sink =
        new CnchMultiSinkFunctionBuilder.MultiRowDataTable(Duration.ofMinutes(3))
            .withGatewayConnection(region, host, port)
            .withGatewayAccount(accountId, username, password)
            .withGatewayVirtualWarehouse(vwId)
            .withFlushInterval(Duration.ofSeconds(10))
            .withFlushBatchSize(5000)
            .build()) {
      // Add a sink to the data stream
      dataStream.addSink(sink);
      // Trigger the execution
      env.execute(MultiSinkCdwTableExample.class.getSimpleName());
    }
    // Select and print out all rows in the table
    System.out.printf("\n==== %s ====\n", tableName1);
    bhCli.selectAll(dbName, tableName1).forEach(System.out::println);
    System.out.printf("\n==== %s ====\n", tableName2);
    bhCli.selectAll(dbName, tableName2).forEach(System.out::println);
    // Clean up
    bhCli.close();
  }
}

参数说明

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是bytehouse-cdw

database

(none)

String

需要连接的 ByteHouse 云数仓版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 云数仓版表的名称。

jdbc.query.timeout

1 minute

Duration

通过 JDBC 执行查询的超时设置。

jdbc.enable-gateway-connection

false

Boolean

指定 JDBC 连接是否需要经过 ByteHouse网关。

bytehouse.gateway.region

(none)

String

ByteHouse 网关区域,设置为 VOLCANO_PRIVATE

bytehouse.gateway.host

(none)

String

ByteHouse 网关的网络域名。格式为 tenant-{TENANT-ID}-{REGION}.bytehouse.[i]volces.com。您可登录 ByteHosue CDW 控制台,在租户管理页签下,单击基本信息,在网络信息模块,查看并复制公网或私网域名。
使用前请将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.port

19000

Integer

ByteHouse 网关的私有端口。前提是将 bytehouse.gateway.region 设置为 VOLCANO_PRIVATE

bytehouse.gateway.virtual-warehouse

(none)

String

通过 ByteHouse Gateway 进行查询处理的计算组的名称或 ID。默认情况下,使用通过 ByteHouse 控制台配置的默认计算组。

bytehouse.gateway.api-token

(none)

String

ByteHouse 的 API key。您可登录 ByteHouse CDW 控制台,在租户管理页签下,单击连接信息,新建并复制 API Key。

bytehouse.gateway.account

(none)

String

指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。

username

(none)

String

ByteHouse 数据库用户名称。您可通过 ByteHouse 控制台>权限管理>用户路径,查看并复制数据库用户名。

paasword

(none)

String

数据库用户的密码。该密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

sink.group-by.key

(none)

String

预接收器分组的密钥。密钥可以由多个字段组成,以逗号分隔。

sink.group-by.expression

(none)

String

预接收器分组的表达式。如果设置了,所有涉及的字段名称也必须在 sink.group-by.key 中列出。

sink.group-by.number-of-groups

(none)

Integer

记录预接收器分组的组数。如果未指定,它将回退到 sink.parallelism(如果提供)。

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。最小为 200 毫秒(ms)。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大大小。最小为 100。

sink.buffer-flush.max-batches

6

Integer

触发异步刷新过载预防的待处理批次数的阈值。最小为 1。

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。将其设置为 -1 表示无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。默认情况下,并行度由框架使用与上游链式运算符相同的并行度来确定。

sink.mode

insert

String

选择要接收的数据记录。支持的值有:

  • insert:仅接受 INSERT 记录。
  • upsert:(底表为unique table时适用)接受 INSERT、UPDATE_AFTER 和 DELETE 记录
  • upsert-no-delete:(底表为unique table时适用)接受 INSERT 和 UPDATE_AFTER 记录。
  • upsert-all:(底表为unique table时适用)接受所有类型的记录。
  • upsert-all-no-delete:(底表为unique table时适用)接受除 DELETE 之外的所有类型的记录。
  • delete-only:仅接受 DELETE 记录。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别

timestamp-offset

(none)

Duration

时间戳数据的附加时间偏移量。

jdbc.connection.custom-settings

enable_point_lookup_profile = 1

String

查询级别的自定义设置,通常用于优化查找。

lookup.async.scale-factor

1

Integer

异步查找的比例因子。如果设置为小于 2,查找将以同步 (SYNC) 模式运行。如果设置为 n(n >= 2),查找将以异步 (ASYNC) 模式运行,每个查找实例的最大并发数等于 n。

lookup.cache.max-rows

0

Long

数据表的查找缓存的最大行数。如果设置为 0,则禁用缓存。

lookup.cache.ttl

(none)

Duration

ByteHouse 表的查找缓存的 TTL。如果未指定,则不为缓存记录设置 TTL。

lookup.max-retries

3

Integer

查找操作失败时允许的最大重试次数。