You need to enable JavaScript to run this app.
导航

通过 Flink Connector驱动导入

最近更新时间2024.01.10 15:26:33

首次发布时间2024.01.10 15:26:33

Flink Connector for ByteHouse 连接器专门用于通过 Flink 将数据加载到 ByteHouse。
本文将介绍通过 Table API&SQL 和 Flink 的 DataStreamAPI 两种方式连接ByteHouse并处理数据。

准备工作

根据您安装的 Flink 版本,下载匹配的 Flink SQL 或 Flink DataStream API 驱动。

驱动版本

匹配 Flink版本

其他要求

下载链接

1.16

1.14-1.16

flink-sql-connector-bytehouse-ce-1.25.1-1.16.jar
未知大小

1.13

1.13及以下版本

Scala版本: 2.11及以上

flink-sql-connector-bytehouse-ce_2.11-1.25.4-1.13.jar
未知大小

注意

请使用与 Flink 版本相匹配的驱动,以保障功能正常使用。

驱动版本

匹配 Flink版本

备注

下载链接

0.4

1.15 及以上版本

Java 8 及以上版本

flinkDataStreamApiPlayground-0.4-SNAPSHOT.jar
未知大小

使用示例

下面是通过 FlinkSQL 将数据表单加载到 ByteHouse 企业版数据表中的示例。

说明

  • 您可参见获取集群连接信息页面来获取需要连接的集群连接信息,并替换下面对应的占位符。
  • 详细 参数说明 附在文末,可供查询。
CREATE TEMPORARY TABLE `bh_ce_source` (  -- 源数据表
  `id` BIGINT NOT NULL,
  `time` TIMESTAMP(0),
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (
  'connector' = 'kinesis',
  'stream' = 'demo_stream',
  'format' = 'json',
  'aws.region' = 'cn-north-1',
  'aws.credentials.provider' = 'BASIC',
  'aws.credentials.basic.accesskeyid' = '???',
  'aws.credentials.basic.secretkey' = '???',
  'scan.shard.getrecords.maxretries' = '7'
);

CREATE TEMPORARY TABLE `bh_ce_sink` (  -- 目标表
  `id` STRING NOT NULL,
  `event_time` STRING,
  `content` ARRAY<DECIMAL(20, 0)>
) WITH (  -- 需要配置集群连接信息
  'connector' = 'bytehouse-ce',
  'clickhouse.shard-discovery.kind' = 'CE_GATEWAY',
  'bytehouse.ce.gateway.host' = '???-public.bytehouse-ce.volces.com', 
  'bytehouse.ce.gateway.port' = '8123',
  'username' = '<< your username of Volcano >>',
  'password' = '<< your password of Volcano >>',
  'sink.buffer-flush.interval' = '1 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.cluster' = '<< your cluster name of ByteHouse CE >>',
  'database' = 'my_db',
  'table-name' = 'my_table_local',
  'sharding-key' = 'event_time'
);

INSERT INTO `bh_ce_sink` (  -- data loading
  `id`,
  `event_time`,
  `content`
)
SELECT
  IFNULL(CAST(`id` AS STRING), '43'),  -- type casting with default value
  CAST(`time` AS STRING),  -- type casting
  IFNULL(`content`, ARRAY[123,12345])  -- default value
FROM `bh_ce_source`

下面是通过 Flink DataStream API 将数据表单加载到 ByteHouse 企业版数据表中的示例。

说明

  • 您可参见获取集群连接信息页面来获取需要连接的集群连接信息,并替换下面对应的占位符。
  • 详细 参数说明 附在文末,可供查询。
package bytehouse.flink.connectoer.demo;
import com.bytedance.bytehouse.flink.connector.clickhouse.ClickHouseSinkFunction;
import com.bytedance.bytehouse.flink.connector.clickhouse.api.java.ClickHouseSinkFunctionBuilder;
import com.bytedance.bytehouse.flink.table.api.RowDataConstructor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class AcceptLogSinkDataStreamExample {
    public static void main(String[] args) throws Exception {
        // 需要配置集群连接信息
        final String gatewayHost = args[0];
        final String username = args[1];
        final String password = args[2];
        final String clusterName = args[3];
        final String dbName = args[4];
        final String tableName = args[5];
        final long rowsPerSecond = Long.parseLong(args[6]);
        final long numberOfRows = Long.parseLong(args[7]);
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 给 DataStream 添加数据源
        DataStream<RowData> dataStream =
                env.addSource(
                                new DataGeneratorSource<>(new RowDataGenerator(), rowsPerSecond, numberOfRows))
                        .returns(TypeInformation.of(RowData.class));
        //  表示数据表 Schema 的列 List
        List<Column> columns =
                Arrays.asList(
                        Column.physical("create_time", DataTypes.TIMESTAMP(0)),
                        Column.physical("completion_id", DataTypes.STRING()),
                        Column.physical("device_mid", DataTypes.STRING()));
        try (@SuppressWarnings("unchecked")
             ClickHouseSinkFunction<RowData, ?> sink =
                     new ClickHouseSinkFunctionBuilder.Upsert(clusterName, dbName, tableName)
                             .withSchema(columns)
                             .withShardDiscoveryKind("CE_GATEWAY")
                             .withGateway(gatewayHost)
                             .withAccount(username, password)
                             .withPrimaryKey(Arrays.asList("completion_id"))
                             .withShardingKey("completion_id")
                             .withFlushInterval(Duration.ofSeconds(1))
                             .build()) {
            // 将sink添加到DataStream
            dataStream.addSink(sink);
            // 触发执行
            env.execute();
        }
    }
    static class RowDataGenerator implements DataGenerator<RowData> {
        private final Random random = new Random();
        private final RowDataConstructor rowDataConstructor =
                RowDataConstructor.apply(
                        new DataType[] {DataTypes.TIMESTAMP(0), DataTypes.STRING(), DataTypes.STRING()});
        @Override
        public RowData next() {
            final Object[] rowDataFields = {
                    LocalDateTime.now(), "CID-" + random.nextInt(1000), "DMID-" + random.nextInt(1000)
            };
            return rowDataConstructor.constructInsert(rowDataFields);
        }
        @Override
        public void open(
                String name, FunctionInitializationContext context, RuntimeContext runtimeContext)
                throws Exception {
            // 此处初始化代码(如果需要)
        }
        @Override
        public boolean hasNext() {
            return true; 
        }
    }
}

参数说明

参数

必选

默认值

数据类型

描述

connector

(none)

String

指定要使用的驱动,这里应该是 bytehouse-ce(ByteHouse企业版) 或者 clickhouse

database

(none)

String

需要连接的 ByteHouse 企业版数据库的名称。

table-name

(none)

String

需要连接的 ByteHouse 企业版表的名称。 注意,这里的“表”是指每个分片的本地表。

clickhouse.cluster

(none)

String

ByteHouse 集群的名称。

clickhouse.shard-discovery.kind

SYSTEM_CLUSTERS

String

ClickHouse 分片发现的类型。 支持的值为:

  • SYSTEM_CLUSTERS: 通过system.clusters 表检索分片IP。
  • CONSUL: 通过 Consul 服务检索分片IP。
  • API_CLUSTERS: 通过 ByteHouse 企业版特定的 OpenAPI 检索分片 IP。
  • CE_GATEWAY: 通过 ByteHouse 企业版网关检索分片 IP。

clickhouse.shard-discovery.service.host

127.0.0.1

String

ByteHouse 分片发现服务的主机名。

clickhouse.shard-discovery.service.port

8123

Integer

ByteHouse分片发现服务的端口号。

clickhouse.shard-discovery.address-mapping

(none)

Map

已发现分片的地址映射。

bytehouse-ce.api.get-consul-info

(built-in)

String

用于检索 Consul 服务信息的 ByteHouse API。

bytehouse-ce.api.get-shard-info

(built-in)

String

用于检索分片信息的 ByteHouse API。

bytehouse-ce.auth.api

(built-in)

String

使用 ByteHouse API 的身份验证密钥。

bytehouse.ce.gateway.host

(none)

String

ByteHouse 企业版 网关的主机。

bytehouse.ce.gateway.port

8123

Integer

ByteHouse 企业版 网关的端口。

username

(none)

String

JDBC 连接用户名。 一旦指定,环境变量 CLICKHOUSE_USERNAME 将被忽略。
如果使用usernamepassword,则必须同时指定两者的值。

password

(none)

String

JDBC 连接密码。 一旦指定,环境变量 CLICKHOUSE_USERNAME 将被忽略。

sharding-strategy

NONE

String

ByteHouse分布式分区策略。 支持的值为:

  • NONE: 不分区。
  • RANDOM: 随机分区。
  • ROUND_ROBIN: 根据数据到达的顺序进行分区。
  • HASH: 根据数据内容进行分区。

sharding-key

(none)

String

哈希分区的键。 键可以包含多个字段,以逗号分隔。

sharding-expression

(none)

String

哈希分区的表达式。 如果设置此项,则所有涉及的字段名称也必须在分区键partition-key中列出。

sink.buffer-flush.interval

1 second

Duration

两次批量刷新之间的最大间隔。 该时间最少为 200 毫秒。

sink.buffer-flush.max-rows

50,000

Integer

刷新前缓冲记录的最大值。 该值最少为 100

sink.buffer-flush.max-batches

6

Integer

通过异步刷新触发过载预防的待处理批次数量的阈值。 该值最小为 1

sink.max-retries

-1

Integer

刷新数据失败时的最大重试次数。 设置为-1意味着无限次重试。

sink.parallelism

(none)

Integer

刷新数据的最大并行度。 默认情况下,并行度由框架使用上游链式运算符的相同并行度来确定。

sink.proactive-validate

false

Boolean

指定是否启用主动数据验证(即在添加到批次之前要验证的每条记录)。 默认情况下,使用被动数据验证(即仅在数据刷新尝试失败时触发数据验证)来减少运行时开销。

sink.enable-upsert

false

Boolean

启用flush除了插入之外还可以执行删除和更新。 请注意,如果将其设置为 true,则架构中必须包含 PRIMARY KEY 约束。

metrics.update-interval

5 seconds

Duration

刷新指标的固定间隔。 该时间最少为 5 秒。

metrics.log-level

INFO

String

记录指标的日志级别。 这对应于 Log4J 内置的标准日志级别