You need to enable JavaScript to run this app.
导航
ByteHouse CDW
最近更新时间:2024.01.29 16:35:23首次发布时间:2023.12.18 17:54:13

在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。

背景信息

ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见ByteHouse 云数仓版简介

使用限制

  • ByteHouse CDW 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • 如果使用火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC;或者 ByteHouse CDW 对 Flink 所在 VPC 进行加白操作。

DDL 定义

CREATE TABLE bh_cdw (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'bytehouse-cdw',
    'database' = 'doc_db',
    'table-name' = 'doc_table_2',
    'username' = 'user-a',
    'password' = 'qa***6',
    -- 指定 ByteHouse Gateway 的地域。
    -- 示例VOLCANO_CN_NORTH_INET为火山引擎华北地域私有网络,此时需要ByteHouse CDW和Flink处于相同VPC;或者ByteHouse CDW对Flink所在VPC进行加白操作。
    'bytehouse.gateway.region' = 'VOLCANO_CN_NORTH_INET',
    -- 用来对数据进行分组和管理的虚拟仓库。
    'bytehouse.gateway.virtual-warehouse' = 'test',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.account' = '210***34',
    'bytehouse.gateway.access-key-id' = '<your-access-key>',
    'bytehouse.gateway.secret-key' = '<your-secret-key>',
    'sink.buffer-flush.interval' = '5 second',
    'sink.buffer-flush.max-rows' = '2000'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 bytehouse-cdw 连接器。

database

(none)

String

数据库名称。需要在 ByteHouse CDW 控制台提前创建数据库,请参见创建库表

table-name

(none)

String

表格名称。需要在 ByteHouse CDW 控制台提前创建表,请参见创建库表

job-id

(none)

String

任务 ID。
是否需要设置任务 ID 取决于所使用的sink.strategy参数。

  • sink.strategy参数设置为EXACTLY_ONCE_xxx时,必须提供任务 ID。这种分片策略要求每个作业都有一个唯一的标识符,以确保任务的精确一次语义。
  • sink.strategy参数其他取值,任务 ID 非必填。

username

(none)

String

JDBC 帐户名。设置 username,需要同时设置 password。

password

(none)

String

JDBC 帐户密码。

jdbc.enable-gateway-connection

true

Boolean

JDBC 连接是否通过 ByteHouse Gateway。

  • true:默认值,通过。
  • false:不通过。

bytehouse.gateway.region

VOLCANO

String

指定 ByteHouse Gateway 的地域。

  • AWS_CN_NORTH:适用于部署在中国 AWS 上的 ByteHouse。
  • AWS_SG:适用于部署在新加坡 AWS 上的 ByteHouse。
  • AWS_US_EAST:适用于部署在美国东部 AWS 上的 ByteHouse。
  • VOLCANO_CN_NORTH:火山引擎华北地域公网使用。
  • VOLCANO_CN_NORTH_INET:火山引擎华北地域私有网络使用。
  • VOLCANO_CN_EAST:火山引擎华东地域公网使用。
  • VOLCANO_CN_EAST_INE:火山引擎华东地域私有网络使用。
  • VOLCANO_PPE:火山引擎 PPE 环境公网使用。
  • VOLCANO_PPE_INET:火山引擎 PPE 环境私有网络使用。
  • VOLCANO_BOE:火山引擎 BOE 环境公网使用。
  • VOLCANO_BOE_INET:火山引擎 BOE 环境私有网络使用。

注意

如果使用火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC;或者 ByteHouse CDW 对 Flink 所在 VPC 进行加白操作。

bytehouse.gateway.virtual-warehouse

(none)

String

用于指定虚拟仓库。
虚拟仓库是一个逻辑概念。通过创建虚拟仓库,您可以对数据进行分组和管理,以便更好地控制和优化数据的存储和查询操作。

bytehouse.gateway.account

(none)

String

指定连接器的帐户 ID,用于认证和授权。
通过usernamepassword设置帐户名称和密码。

bytehouse.gateway.access-key-id

(none)

String

连接器帐户的 Access Key。
说明
如果设置帐户的 Access Key,则必须同时设置 Secret Key。

bytehouse.gateway.secret-key

(none)

String

连接器帐户的 Secret Key。

bytehouse.gateway.api-token

(none)

String

连接器帐户的 API Token。

bytehouse.storage.dump-parallelism

1

Integer

指定导出数据(Dump)并行度。通常,较大的并行度可以提供更快的导出速度,但也会占用更多的计算资源,请仔细评估。

  • 增加并行度,可以提高导出数据的速度和效率。特别是大型数据集通过将数据分成多个并行任务进行导出,可以充分利用计算资源并减少导出时间。
  • 过高的并行度可能会对系统资源产生过多的负载,导致性能下降或系统不稳定。

sink.strategy

AT_LEAST_ONCE

String

数据写入到 ByteHouse 表格的策略。

  • AT_LEAST_ONCE
  • EXACTLY_ONCE_TRACK_BATCH_META
  • EXACTLY_ONCE_TRANSACTION

sink.exactly-once.transaction.timeout

1 minute

Duration

在流数据处理中使用的一种 Exactly-Once 语义的 Sink 策略中的事务超时时间。在指定的时间内,如果事务未能成功完成,则会被视为超时并进行回滚或重试。
仅当sink.strategy设置为EXACTLY_ONCE_TRANSACTION策略时有效。
Exactly-Once 语义是指确保每条数据只被处理一次,并且不会发生重复或丢失的情况。为了实现这种语义,通常会使用事务来确保数据的一致性和可靠性。

sink.buffer-flush.interval

1 second

Duration

刷新时间间隔,最小值为200 ms

sink.buffer-flush.max-rows

100,000

Integer

缓冲记录大小,最小值为2000

sink.buffer-flush.max-batches

32

Integer

数据写入到 Sink 的缓冲区时的最大批次数,最小值为1

sink.max-retries

3

Integer

刷新数据失败时的最大尝试次数。

sink.parallelism

(none)

Integer

刷新数据的并行度。默认情况下,与上游算子并行度保持一致。

sink.proactive-validate

false

Boolean

是否主动验证数据。

  • true:主动验证数据。在批处理前验证每条记录。
  • false:默认值,被动验证数据。在数据刷新尝试失败时触发验证数据。

metrics.update-interval

5 seconds

Duration

刷新指标的时间间隔,最小设置为 5 seconds。

metrics.log-level

INFO

String

日志级别。
如需了解更多信息,请参见Log4j 内置日志级别

示例代码

CREATE TABLE random_source (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'datagen', 
    'rows-per-second'='1'
);


CREATE TABLE bh_cdw (
    f0 VARCHAR, 
    f1 VARCHAR, 
    f2 VARCHAR) WITH (
    'connector' = 'bytehouse-cdw',
    'database' = 'doc_db',
    'table-name' = 'doc_table_2',
    'username' = 'user-a',
    'password' = 'qa***6',
    -- 指定 ByteHouse Gateway 的地域。
    -- 示例VOLCANO_CN_NORTH_INET为火山引擎华北地域私有网络,此时需要ByteHouse CDW和Flink处于相同VPC;或者ByteHouse CDW对Flink所在VPC进行加白操作。
    'bytehouse.gateway.region' = 'VOLCANO_CN_NORTH_INET',
    -- 用来对数据进行分组和管理的虚拟仓库。
    'bytehouse.gateway.virtual-warehouse' = 'test',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.account' = '210***34',
    'bytehouse.gateway.access-key-id' = '<your-access-key>',
    'bytehouse.gateway.secret-key' = '<your-secret-key>',
    'sink.buffer-flush.interval' = '5 second',
    'sink.buffer-flush.max-rows' = '2000'
);


INSERT INTO bh_cdw 
SELECT f0, f1, f2 FROM random_source;