You need to enable JavaScript to run this app.
导航
ByteHouse CE
最近更新时间:2023.12.18 17:54:22首次发布时间:2023.12.18 17:54:22

在 Flink 控制台,bytehouse-ce 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,是火山引擎基于开源 ClickHouse 进行深度优化和改造的版本,提供海量数据上更强的查询服务和数据写入性能。
ByteHouse 企业版(CE)基于火山内部的丰富场景,以及 ClickHouse 开源版的痛点进行了深度定制,包括多场景表引擎、扩展数据类型、多级存储等功能。如需了解 ByteHouse 企业版更多信息,请参见ByteHouse 企业版简介

使用限制

ByteHouse CE 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。

DDL 定义

CREATE TABLE bh_ce_sink (
  f0 VARCHAR,
  f1 VARCHAR,
  f2 VARCHAR
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.cluster' = 'bytehouse_cluster_***',
  'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com',
  'username' = 'user_a',
  'password' = 'pa***45',
  'database' = 'default',
  'table-name' = 'doc_test',
  'sink.buffer-flush.interval' = '10 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是bytehouse-ceclickhouse连接器 。

database

(none)

String

数据库名称。需要在 ByteHouse CE 控制台提前创建数据库,请参见创建库表

table-name

(none)

String

表格名称。需要在 ByteHouse CE 控制台提前创建表,请参见创建库表

clickhouse.cluster

(none)

String

ByteHouse CE 集群名称。需要在 ByteHouse CE 控制台提前创建集群,请参见创建集群

clickhouse.shard-discovery.kind

SYSTEM_CLUSTERS

String

分片发现类型。

  • SYSTEM_CLUSTERS:通过查询system.clusters 表来获取分片的 IP。
  • CONSUL:通过 Consul 服务来获取分片的 IP。
  • API_CLUSTERS:通过 ByteHouse CE 特定 OpenAPI 获取分片的 IP。
  • CE_GATEWAY:通过 ByteHouse CE Gateway 来获取分片的 IP。

clickhouse.shard-discovery.service.host

127.0.0.1

String

分片发现服务的主机地址。

clickhouse.shard-discovery.service.port

8123

Integer

分片发现服务的端口号。

clickhouse.shard-discovery.address-mapping

(none)

Map

分片发现地址映射,将实际的分片地址映射到可供访问的地址。
示例:192.18.*.*|8123:192.168.*.*|8123

bytehouse-ce.api.get-consul-info

(built-in)

String

用于获取 Consul 服务信息的 ByteHouse CE API。

bytehouse-ce.api.get-shard-info

(built-in)

String

用于获取分片信息的 ByteHouse CE API。

bytehouse-ce.auth.api

(built-in)

String

用于进行身份验证的 ByteHouse CE API。

username

(none)

String

JDBC 用户名。
设置 username,需要同时设置 password。

password

(none)

String

JDBC 用户密码。

sharding-strategy

NONE

String

分区策略。

  • NONE:没有分区。
  • RANDOM: 随机分区。
  • ROUND_ROBIN:基于数据到达顺序进行分区。
  • HASH:基于数据内容进行分区。

sharding-key

(none)

String

哈希分区键,用于确定数据分布到不同分片。可以由一个或多个字段组成,多个字段之间使用逗号进行分隔。

sharding-expression

(none)

String

哈希分区表达式,用于确定数据分布到不同分片。如果设置了哈希分区表达式,则所有相关字段的名称也必须在分区键中列出。

sink.buffer-flush.interval

1 second

Duration

刷新时间间隔,最小值为200 ms

sink.buffer-flush.max-rows

50,000

Integer

缓冲记录大小,最小值为100

sink.buffer-flush.max-batches

6

Integer

数据写入到 Sink 的缓冲区时的最大批次数,最小值为1

sink.max-retries

-1

Integer

刷新数据失败时的最大尝试次数。设置为-1,表示无限重试。

sink.parallelism

(none)

Integer

刷新数据的并行度。默认情况下,与上游算子并行度保持一致。

sink.proactive-validate

false

Boolean

是否主动验证数据。

  • true:主动验证数据。在批处理前验证每条记录。
  • false:默认值,被动验证数据。在数据刷新尝试失败时触发验证数据。

sink.enable-upsert

false

Boolean

是否启用 Upsert 操作(插入或更新)到 Sink。
sink.enable-upsert设置为true时,将允许执行 Upsert 操作,即在写入数据到 Sink 时,如果数据已经存在则进行更新,否则进行插入。
由于不是所有的 Sink 都支持 Upsert 操作,在启用sink.enable-upsert之前,请确保您的 Sink 支持该操作。否则可能会导致错误或不可预测的行为。

metrics.update-interval

5 seconds

Duration

刷新指标的时间间隔,最小设置为 5 seconds。

metrics.log-level

INFO

String

日志级别。
如需了解更多信息,请参见Log4j 内置日志级别

示例代码

CREATE TABLE random_source (
   f0 VARCHAR,
   f1 VARCHAR,
   f2 VARCHAR
 ) WITH (
   'connector' = 'datagen',
   'rows-per-second' = '1'
 );

CREATE TABLE bh_ce_sink (
  f0 VARCHAR,
  f1 VARCHAR,
  f2 VARCHAR
) WITH (
  'connector' = 'bytehouse-ce',
  'clickhouse.cluster' = 'bytehouse_cluster_***',
  'clickhouse.shard-discovery.service.host' = '7249621***.bytehouse-ce.ivolces.com',
  'username' = 'user_a',
  'password' = 'pa***45',
  'database' = 'default',
  'table-name' = 'doc_test',
  'sink.buffer-flush.interval' = '10 second',
  'sink.buffer-flush.max-rows' = '5000',
  'clickhouse.shard-discovery.address-mapping' = '192.18.*.*|8123:192.168.*.*|8123'
);

INSERT INTO bh_ce_sink 
SELECT f0, f1, f2 FROM random_source;