You need to enable JavaScript to run this app.
导航

HaKafka

最近更新时间2023.06.07 10:05:57

首次发布时间2022.12.15 10:08:27

HaKafka 是一种特殊的表引擎,修改自社区 Kafka 引擎。使用 Kafka / HaKafka 引擎可以订阅 Kafka 上的 topic,拉取并解析 topic 中的消息,然后通过 MaterializedView 将 Kafka/HaKafka 解析到的数据写入到目标表(一般为HaMergeTree)。
在 ByteHouse GUI 中,创建 Kafka 导入任务,底层即为创建了 HaKafka 和 MaterializedView 两张表。

alt

在 ByteHouse 中,社区的 Kafka 引擎目前基本上未做改动,不具备高可用的功能,不推荐使用,以下仅介绍 HaKafka。

建表示例

SQL 建表

建表语法

建一张 HaKafka 的语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = HaKafka('{shard}', '{replica}')
SETTINGS (.....)

SETTINGS

参数名类型必填/默认值说明
kafka_broker_listString必填ip:port。可以多个,逗号分隔。
kafka_topic_listString必填可以多个,逗号分隔。
kafka_group_nameString必填消费组名称。
kafka_formatString必填消息格式;目前最常用 JSONEachRow。
kafka_row_delimiterString'\0'一般使用 '\n'。
kafka_schemaString''protobuf 格式需要这个参数。

kafka_num_consumers

UInt64

1

消费者个数,每个消费者会创建一个线程。
一般建议设置为 1 - 4,每个线程大约 20MB/s 的写入性能。

kafka_max_block_size

UInt64

65536

写入block_size
默认 65536 MB

kafka_leader_priority

String

'0'

会存储到zk上,互为主备的一对(组)消费者,仅leader_priority最小的会开启消费。其他节点的表不会消费。可被macro替换。

kafka_partition_num

String

'-1'

-1 表示使用动态分配(kafka subscribe API);
>= 0 表示使用静态分配(kafka assign API)。

kafka_shard_countString'1'集群shard数,决定静态分配的分配规则。
kafka_auto_offset_resetString''启动消费时或者数据过期时,offset的设置方式,可填:"earliest", "latest"。

extra_librdkafka_config

String

''

JSON 形式;可以透传任何 librdkafka 支持的参数。请参考:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。

影响 HaKafka 的 users.xml 参数

NameDefaultDescription
stream_flush_interval_ms7500ms, 每次消费batch的POLL数据的超时时间。
stream_poll_timeout_ms500ms, rdkafka poll 数据的等待时间,影响stop consume和超时判断。
kafka_session_timeout_ms180000ms, kafka session 超时时间,仅静态分配时会生效。
kafka_max_partition_fetch_bytes1048576bytes, 从 topic partition 拉去数据的最大大小,影响POLL数据的性能。
使用示例

手动建导入任务

你可以通过控制面自动建导入任务,但若你需要手动建导入任务,则需要建一张 HaKafka 表,一张 Materialized View 物化视图,以及一张 HaMergeTree(或 HaUniqueMergeTree)的底表。以下展示了手动建表的示例。

步骤1:建 HaKafka 表

CREATE TABLE TEST.CONSUMER_tce_service_resource_usage_local ON CLUSTER default (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaKafka('/clickhouse/experiment/TEST.CONSUMER_tce_service_resource_usage_local_233/{shard}', '{replica}')
SETTINGS 
    kafka_cluster = 'xxx_cluster',  -- 替换成kafka集群名
    kafka_topic_list = 'xxx_topic', -- 替换成业务方的topic
    kafka_group_name = 'xxx_group_name', -- 替换成自己想取的 group_name
    kafka_format = 'JSONEachRow', -- 一般用json
    kafka_row_delimiter = '\n', -- 一般是 \n

建好之后,可以直接从表中 SELECT 数据 (一般用来debug,不能在线上使用)

SELECT * FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 3 FORMAT CSVWithNames;
-- "current_ts","psm","cpu_limit_pod"

-- 1567666740,"toutiao.video.user_packer",4

-- 1567666740,"ad.bi.datacore",4

-- 1567666740,"tce.sysprobe.probeagent",2

步骤2 建本地表与物化视图表

HaKafka 表定义了如何从 Kafka topic 中消费(解析)数据,为了将数据写入磁盘中,还需要建立一张 HaMergeTree 为引擎的目标表(或 HaUniqueMergeTree 为引擎),以及一张物化视图。

--- 准备一张存储数据的HaMergeTree表,
CREATE TABLE TEST.tce_service_resource_usage_local ON CLUSTER default (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaMergeTree('/clickhouse/experiment/TEST.tce_service_resource_usage_local_666/{shard}', '{replica}')
PARTITION BY toDate(current_ts) ORDER BY psm

--- 再建一张物化视图,表示把数据从HaKafka表中SELECT出来写入到HaMergeTree表
CREATE MATERIALIZED VIEW TEST.VIEW_tce_service_resource_usage_local
TO TEST.tce_service_resource_usage_local ON CLUSTER default
AS SELECT 
    current_ts, 
    psm, 
    cpu_limit_pod 
    FROM TEST.CONSUMER_TEST_tce_service_resource_usage_local

三张表建好之后,每隔一段时间 (取决于 stream_flush_interval_ms 参数和数据持久化的时间),数据会写入目标表中。之后只需要查询目标表即可。

其他操作

开启 / 关闭 / 重启消费

SQL 语法:

SYSTEM START/STOP/RESTART CONSUME <table_name>

语义:
开启/关闭/重启 HaKafka engine 的消费。将会改变HaKafka表的消费状态(ON/OFF),并持久化到磁盘,重启之后仍然保持原有状态。

  1. 三个操作都是幂等的,可以多次执行。 多次START/STOP仅有第一次生效,多次RESTART则每次都会生效。

  2. STOP/RESTART 都会销毁当前消费者

  3. 可以通过 system.kafka_tables 的 consumer_switch 字段查询是否开启/停止消费。

更改设置

通过如下语句可以修改 HaKafka 表设置,修改成功后会自动重启消费

ALTER TABLE db.table MODIFY SETTING <name1> = <value1>, <name2> = <value2>

Virtual Columns

有的业务方需要获取 Kafka 消息的元数据(e.g. 消息的partition, offset等)。我们可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。在SELECT语句查询时可以显式选出 virtual columns(同样可以放到VIEW表的SELECT语句中):

-- 
SELECT 
    _topic,    -- String
    _partition,    -- UInt64
    _key,    -- String
    _offset,    -- UInt64
    _content,  -- String: 完整的消息内容 
    *    -- 正常列可以通过*展开,虚拟列则不能
FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 10