最近更新时间:2024.01.29 16:35:23
首次发布时间:2023.12.18 17:54:13
在 Flink 控制台,bytehouse-cdw 连接器支持做结果表,可以通过 Flink 任务将数据写入到 ByteHouse 目标表。
ByteHouse 是一款云原生数据仓库,云数仓版(CDW)是一个支持实时导入和离线导入的自助数据分析平台,能够对海量数据进行高效分析。
如需了解 ByteHouse 云数仓版更多信息,请参见ByteHouse 云数仓版简介。
CREATE TABLE bh_cdw ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'bytehouse-cdw', 'database' = 'doc_db', 'table-name' = 'doc_table_2', 'username' = 'user-a', 'password' = 'qa***6', -- 指定 ByteHouse Gateway 的地域。 -- 示例VOLCANO_CN_NORTH_INET为火山引擎华北地域私有网络,此时需要ByteHouse CDW和Flink处于相同VPC;或者ByteHouse CDW对Flink所在VPC进行加白操作。 'bytehouse.gateway.region' = 'VOLCANO_CN_NORTH_INET', -- 用来对数据进行分组和管理的虚拟仓库。 'bytehouse.gateway.virtual-warehouse' = 'test', 'jdbc.enable-gateway-connection' = 'true', 'bytehouse.gateway.account' = '210***34', 'bytehouse.gateway.access-key-id' = '<your-access-key>', 'bytehouse.gateway.secret-key' = '<your-secret-key>', 'sink.buffer-flush.interval' = '5 second', 'sink.buffer-flush.max-rows' = '2000' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 bytehouse-cdw 连接器。 |
database | 是 | (none) | String | 数据库名称。需要在 ByteHouse CDW 控制台提前创建数据库,请参见创建库表。 |
table-name | 是 | (none) | String | 表格名称。需要在 ByteHouse CDW 控制台提前创建表,请参见创建库表。 |
job-id | 否 | (none) | String | 任务 ID。
|
username | 否 | (none) | String | JDBC 帐户名。设置 username,需要同时设置 password。 |
password | 否 | (none) | String | JDBC 帐户密码。 |
jdbc.enable-gateway-connection | 否 | true | Boolean | JDBC 连接是否通过 ByteHouse Gateway。
|
bytehouse.gateway.region | 否 | VOLCANO | String | 指定 ByteHouse Gateway 的地域。
注意 如果使用火山引擎私有网络,此时需要 ByteHouse CDW 和 Flink 处于相同 VPC;或者 ByteHouse CDW 对 Flink 所在 VPC 进行加白操作。 |
bytehouse.gateway.virtual-warehouse | 否 | (none) | String | 用于指定虚拟仓库。 |
bytehouse.gateway.account | 否 | (none) | String | 指定连接器的帐户 ID,用于认证和授权。 |
bytehouse.gateway.access-key-id | 否 | (none) | String | 连接器帐户的 Access Key。 |
bytehouse.gateway.secret-key | 否 | (none) | String | 连接器帐户的 Secret Key。 |
bytehouse.gateway.api-token | 否 | (none) | String | 连接器帐户的 API Token。 |
bytehouse.storage.dump-parallelism | 否 | 1 | Integer | 指定导出数据(Dump)并行度。通常,较大的并行度可以提供更快的导出速度,但也会占用更多的计算资源,请仔细评估。
|
sink.strategy | 否 | AT_LEAST_ONCE | String | 数据写入到 ByteHouse 表格的策略。
|
sink.exactly-once.transaction.timeout | 否 | 1 minute | Duration | 在流数据处理中使用的一种 Exactly-Once 语义的 Sink 策略中的事务超时时间。在指定的时间内,如果事务未能成功完成,则会被视为超时并进行回滚或重试。 |
sink.buffer-flush.interval | 否 | 1 second | Duration | 刷新时间间隔,最小值为 |
sink.buffer-flush.max-rows | 否 | 100,000 | Integer | 缓冲记录大小,最小值为 |
sink.buffer-flush.max-batches | 否 | 32 | Integer | 数据写入到 Sink 的缓冲区时的最大批次数,最小值为 |
sink.max-retries | 否 | 3 | Integer | 刷新数据失败时的最大尝试次数。 |
sink.parallelism | 否 | (none) | Integer | 刷新数据的并行度。默认情况下,与上游算子并行度保持一致。 |
sink.proactive-validate | 否 | false | Boolean | 是否主动验证数据。
|
metrics.update-interval | 否 | 5 seconds | Duration | 刷新指标的时间间隔,最小设置为 5 seconds。 |
metrics.log-level | 否 | INFO | String | 日志级别。 |
CREATE TABLE random_source ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'datagen', 'rows-per-second'='1' ); CREATE TABLE bh_cdw ( f0 VARCHAR, f1 VARCHAR, f2 VARCHAR) WITH ( 'connector' = 'bytehouse-cdw', 'database' = 'doc_db', 'table-name' = 'doc_table_2', 'username' = 'user-a', 'password' = 'qa***6', -- 指定 ByteHouse Gateway 的地域。 -- 示例VOLCANO_CN_NORTH_INET为火山引擎华北地域私有网络,此时需要ByteHouse CDW和Flink处于相同VPC;或者ByteHouse CDW对Flink所在VPC进行加白操作。 'bytehouse.gateway.region' = 'VOLCANO_CN_NORTH_INET', -- 用来对数据进行分组和管理的虚拟仓库。 'bytehouse.gateway.virtual-warehouse' = 'test', 'jdbc.enable-gateway-connection' = 'true', 'bytehouse.gateway.account' = '210***34', 'bytehouse.gateway.access-key-id' = '<your-access-key>', 'bytehouse.gateway.secret-key' = '<your-secret-key>', 'sink.buffer-flush.interval' = '5 second', 'sink.buffer-flush.max-rows' = '2000' ); INSERT INTO bh_cdw SELECT f0, f1, f2 FROM random_source;