You need to enable JavaScript to run this app.
导航

HaKafka

最近更新时间2024.03.26 19:29:05

首次发布时间2022.12.15 10:08:27

HaKafka 是一种特殊的表引擎,修改自社区 Kafka 引擎。使用 Kafka / HaKafka 引擎可以订阅 Kafka 上的 topic,拉取并解析 topic 中的消息,然后通过 MaterializedView 将 Kafka/HaKafka 解析到的数据写入到目标表(一般为HaMergeTree)。
在 ByteHouse GUI 中,创建 Kafka 导入任务,底层即为创建了 HaKafka 和 MaterializedView 两张表。
图片
在 ByteHouse 中,社区的 Kafka 引擎目前基本上未做改动,不具备高可用的功能,不推荐使用,以下仅介绍 HaKafka。

建表示例

SQL 建表

建表语法

建一张 HaKafka 的语法如下:

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = HaKafka('{shard}', '{replica}')
SETTINGS (.....)

SETTINGS

参数名

类型

必填/默认值

说明

kafka_broker_list

String

必填

ip:port。可以多个,逗号分隔。

kafka_topic_list

String

必填

可以多个,逗号分隔。

kafka_group_name

String

必填

消费组名称。

kafka_format

String

必填

消息格式;目前最常用 JSONEachRow。

kafka_row_delimiter

String

'\0'

一般使用 '\n'。

kafka_schema

String

''

protobuf 格式需要这个参数。

kafka_num_consumers

UInt64

1

消费者个数,每个消费者会创建一个线程。
一般建议设置为 1 - 4,每个线程大约 20MB/s 的写入性能。

kafka_max_block_size

UInt64

65536

写入block_size
默认 65536 MB

kafka_leader_priority

String

'0'

会存储到zk上,互为主备的一对(组)消费者,仅leader_priority最小的会开启消费。其他节点的表不会消费。可被macro替换。

kafka_partition_num

String

'-1'

-1 表示使用动态分配(kafka subscribe API);

= 0 表示使用静态分配(kafka assign API)。

kafka_shard_count

String

'1'

集群shard数,决定静态分配的分配规则。

kafka_auto_offset_reset

String

''

启动消费时或者数据过期时,offset的设置方式,可填:"earliest", "latest"。

extra_librdkafka_config

String

''

JSON 形式;可以透传任何 librdkafka 支持的参数。请参考:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md。

影响 HaKafka 的 users.xml 参数

Name

Default

Description

stream_flush_interval_ms

7500

ms, 每次消费batch的POLL数据的超时时间。

stream_poll_timeout_ms

500

ms, rdkafka poll 数据的等待时间,影响stop consume和超时判断。

kafka_session_timeout_ms

180000

ms, kafka session 超时时间,仅静态分配时会生效。

kafka_max_partition_fetch_bytes

1048576

bytes, 从 topic partition 拉去数据的最大大小,影响POLL数据的性能。

使用示例

手动建导入任务

你可以通过控制面自动建导入任务,但若你需要手动建导入任务,则需要建一张 HaKafka 表,一张 Materialized View 物化视图,以及一张 HaMergeTree(或 HaUniqueMergeTree)的底表。以下展示了手动建表的示例。

步骤1:建 HaKafka 表

CREATE TABLE TEST.CONSUMER_tce_service_resource_usage_local ON CLUSTER default (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaKafka('/clickhouse/experiment/TEST.CONSUMER_tce_service_resource_usage_local_233/{shard}', '{replica}')
SETTINGS 
    kafka_cluster = 'xxx_cluster',  -- 替换成kafka集群名
    kafka_topic_list = 'xxx_topic', -- 替换成业务方的topic
    kafka_group_name = 'xxx_group_name', -- 替换成自己想取的 group_name
    kafka_format = 'JSONEachRow', -- 一般用json
    kafka_row_delimiter = '\n', -- 一般是 \n

建好之后,可以直接从表中 SELECT 数据 (一般用来debug,不能在线上使用)

SELECT * FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 3 FORMAT CSVWithNames;
-- "current_ts","psm","cpu_limit_pod"

-- 1567666740,"toutiao.video.user_packer",4

-- 1567666740,"ad.bi.datacore",4

-- 1567666740,"tce.sysprobe.probeagent",2

步骤2 建本地表与物化视图表

HaKafka 表定义了如何从 Kafka topic 中消费(解析)数据,为了将数据写入磁盘中,还需要建立一张 HaMergeTree 为引擎的目标表(或 HaUniqueMergeTree 为引擎),以及一张物化视图。

--- 准备一张存储数据的HaMergeTree表,
CREATE TABLE TEST.tce_service_resource_usage_local ON CLUSTER default (
    `current_ts` Int64,
    `psm` String,
    `cpu_limit_pod` Float64
) ENGINE = HaMergeTree('/clickhouse/experiment/TEST.tce_service_resource_usage_local_666/{shard}', '{replica}')
PARTITION BY toDate(current_ts) ORDER BY psm

--- 再建一张物化视图,表示把数据从HaKafka表中SELECT出来写入到HaMergeTree表
CREATE MATERIALIZED VIEW TEST.VIEW_tce_service_resource_usage_local ON CLUSTER default
TO TEST.tce_service_resource_usage_local
AS SELECT 
    current_ts,  
    psm, 
    cpu_limit_pod 
    FROM TEST.CONSUMER_TEST_tce_service_resource_usage_local

三张表建好之后,每隔一段时间 (取决于 stream_flush_interval_ms 参数和数据持久化的时间),数据会写入目标表中。之后只需要查询目标表即可。

其他操作

开启 / 关闭 / 重启消费

SQL 语法:

SYSTEM START/STOP/RESTART CONSUME <table_name>

语义:
开启/关闭/重启 HaKafka engine 的消费。将会改变HaKafka表的消费状态(ON/OFF),并持久化到磁盘,重启之后仍然保持原有状态。

  1. 三个操作都是幂等的,可以多次执行。 多次START/STOP仅有第一次生效,多次RESTART则每次都会生效。
  2. STOP/RESTART 都会销毁当前消费者
  3. 可以通过 system.kafka_tables 的 consumer_switch 字段查询是否开启/停止消费。

更改设置

通过如下语句可以修改 HaKafka 表设置,修改成功后会自动重启消费

ALTER TABLE db.table MODIFY SETTING <name1> = <value1>, <name2> = <value2>

Virtual Columns

有的业务方需要获取 Kafka 消息的元数据(e.g. 消息的partition, offset等)。我们可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。在SELECT语句查询时可以显式选出 virtual columns(同样可以放到VIEW表的SELECT语句中):

-- 
SELECT 
    _topic,    -- String
    _partition,    -- UInt64
    _key,    -- String
    _offset,    -- UInt64
    _content,  -- String: 完整的消息内容 
    *    -- 正常列可以通过*展开,虚拟列则不能
FROM TEST.CONSUMER_tce_service_resource_usage_local LIMIT 10