本文通过一个“实时监控工厂车间温度和湿度数据收集”的使用场景,来介绍 Kafka 去重导入 ByteHouse 的使用流程。
工厂会在关键车间设置热传感器,收集温度、湿度等各种传感器数据,并发送给 ByteHouse 进行分析。通过聚合查询和明细查询,可以解决温度和湿度的平均值、极值、变化趋势、警报事件统计等问题,以及特定事件的详细信息、数据校验和准确性、响应维护需求等。
表名 | 数据来源 | 样例数据 | 数据意义 |
|---|---|---|---|
热量传感器数据表 | Kafka |
|
|
heat_sensor_data 的数据的订阅。ParitionNum%ByteHouseShardCount的对应关系消费到 ByteHouse,分区数建议为 ByteHouse 分片数的整数倍,使得数据最终落入分片时可以均衡。比如此示例有 4 个 Shard,则建议分区数选择 4 个,8 个等。此处以 8 个示例。heat_sensor_data_local。heat_sensor_data。我们可以通过 ByteHouse 控制台完成新建名为 sensor 的数据库。
或在客户端里通过 SQL 创建:
CREATE DATABASE sensor ON CLUSTER bytehouse_test
选择表引擎,可以参考下面的说明。考虑需求是希望每个 ID 每小时保留最新一行数据,此处采用 HaUniqueMergeTree 。
说明
在选择 MergeTree 家族表引擎时,请判断是否需要去重,或者由于列的数据产生不一致,需要部分列更新。
做好数据映射。以下是基于导入的 JSON 数据示例,我们对要注意的地方进行了标注。
{ "sensorId": "sensor1031984076491", //String "timestamp": "2023-12-01T00:00:20", //DateTime "temperature": 31.68,//Float64 "humidity": 55.09,//Float64 "heading": "SW", //字段是常量,建议使用 LowCadinality,LowCadinality 自动进行了编码,查询更快 "alertFlags": ["humidityDrop"],//因此这个场景下,其中只有 ["overheat", "humidityDrop"] 两种可能,可以展开到2个bool字段,之后查询 SQL 可以更简单 "gpsLatitude": -20.510689, "gpsLongitude": 42.272274 }
考虑数据分布。
基于性能考虑,HaUniqueMergeTree 引擎只能在分片内去重。这就意味着,不论以什么方式写入,相同唯一键的数据,必须插入相同的分片。
数据通过 Kafka 导入方式插入 ByteHouse,最终落入 ByteHouse 是根据 Partition_num%shard_count 选择分片,例如此场景有 2 个分片,上游有 4 个分区,则 3 号分区会插入 1 号分片。
基于上述两点,上游需要保证 Kafka 按照唯一键(sensorId)为依据进行分区,可以做到所有 sensorId 一致的行都落到同一分片,以成功去重。若使用非去重的引擎 HaMergeTree,则无此问题。
确定其他表关键字段选择。
(sensorId, hour)。确认字段是否为 Nullable。本案例中需考虑的因素如下:
确认其他注意事项:
index_granularity,索引的颗粒度,值越小单行查的越快,值越大批量查得越快;建议采用默认的 8192。partition_level_unique_keys:去重级别
enable_disk_based_unique_key_index:unique key 索引位置:
基于上述要求,生成 SQL 如下:
-- 创建本地表 CREATE TABLE sensor.heat_sensor_data_unique_local ON CLUSTER bytehouse_test ( sensorId String, timestamp DateTime, hour DateTime MATERIALIZED toStartOfHour(timestamp), temperature Nullable(Float64), humidity Nullable(Float64), heading LowCadinality(Nullable(String)), overheat Nullable(Bool), humidityDrop Nullable(Bool), gpsLatitude Nullable(Float64), gpsLongitude Nullable(Float64) ) ENGINE = HaUniqueMergeTree('/clickhouse/tables/{shard}/sensor/heat_sensor_data_unique_local', '{replica}') UNIQUE KEY (sensorId, hour) ORDER BY sensorId PARTITION BY toDate(timestamp) TTL timestamp + INTERVAL 3 YEAR SETTINGS index_granularity = 8192; -- 创建分布式表 CREATE TABLE sensor.heat_sensor_data_unique ON CLUSTER bytehouse_test AS sensor.heat_sensor_data_unique_local ENGINE = Distributed(bytehouse_test, sensor, heat_sensor_data_unique_local);
登录 ByteHouse 控制台,单击数据导入,在右上角选择对应集群。
单击数据源模块中的“+”,新建数据源,数据源类型选择为 Kafka。
在右侧数据源配置界面,根据界面提示,依次输入以下信息:
参数项 | 配置说明 |
|---|---|
源类型 | 选择 Kafka 数据源类型。 |
源名称 | 任务名称,和其他任务不能重名。 |
Kafka 代理列表 | 填写对应的 Kafka Broker 地址。填写时,请务必填写您使用的接入点所有 Broker 节点的 IP 地址。如果需要填写多个 Broker 地址,请用逗号(,)进行分割。如 |
身份验证模式 | 当前 Kafka 数据源支持四种鉴权模式,包括 NONE 无鉴权、PLAIN、SCRAM-SHA-256、SCRAM-SHA-512,并支持 SSL 加密,您可根据 Kafka 代理列表 IP 地址的端口号勾选,对应关系如下:
|
安全协议 | 支持选择 SASL_PLAINTEXT、SASL_SSL 协议类型。 |
用户名、密码 | 填写有权限访问 Kafka 实例的用户名和密码信息。 |
数据源信息填写完成后,单击确定按钮,进行数据源连通性测试,连通成功后,即代表数据源创建成功。
在 Kafka 数据源下,单击新建导入任务。
进入新建导入任务配置界面,并完成以下信息配置:
参数 | 说明 |
|---|---|
通用信息 | |
任务名称 | 填写导入任务名称信息,支持数字、字母及下划线,不能以数字开头,最多仅支持 128 字符,且不能和现有任务重名。 |
描述 | 输入该导入任务相关描述信息,方便后续维护管理。 |
选择数据源 | |
源类型 | 选择 Kafka 数据源类型。 |
数据源 | 下拉选择已创建成功的 Kafka 数据源。 |
Topic | 下拉选择 Kafka 数据源中的已有的 Topic 信息,本案例中为 sensor。 |
Group 名称 | Kafka Consumer Group 名称。建议手动设置,便于运维任务时辨认,如 bytehouse_sensor_data_consumer。 |
自动重设 Offset | 指初次启动任务时,Kafka 最新生产的数据开始消费的 offset,第二次启动任务时,会从上次消费暂停的 offset 恢复。 |
格式 | 消息格式,目前最常用 JSONEachRow。 |
分隔符 | 输入消息分隔符,一般使用 '\n'。 |
消费者个数 | 消费者个数,每个消费者会创建一个线程。建议填写 2,后续消费性能不够可以继续改大(最大不超过核数/2)。 |
写入 Block Size | 写入的 block_size 大小。建议按默认的 65536,如果对实时性要求高,可以减半。 |
选择目标表 | |
目标数据库 | 下拉选择数据导入的目标 ByteHouse 数据库。 |
目标数据表 | 下拉选择数据导入的目标 ByteHouse 表。 |
目标 Schema 配置 | |
提取 Schema | 此处配置 Kafka 中的信息和 ByteHouse 表信息的映射。
使用数据映射方式时,注意覆盖方式选择“覆盖添加”。 |
导入公式示例
Array 数据提取:此示例中,"alertFlags" 字段里只会出现 overheat 和 humidityDrop 两个值,因此希望将数据提取到 overheat 和 humidityDrop 两个 Bool 列,如果数组中有 "overheat",则列 overheat = True。
因此按如下方法设置 overheat 列(humidityDrop列同理):
has(alertFlags,"overheat")JSON 多层嵌套处理:
例如,如果 "gpsLatitude" 嵌套在一个 mapFloat 中,要把它提取到 gpsLatitude 列:
"mapFloat": { "gpsLatitude": -20.510689, "gpsLongitude": 42.272274 }
需要按如下规则填写:
JSONExtract(_content, 'mapFload','gpsLatitude', 'Float64')(参考:JSON 函数)Timestamp 处理:
toTimezone(timestamp, <timezone>)处理;
toTimezone(timestamp, 'Asia/Shanghai')parseDateTimeBestEffortOrNull(timestamp)表达式。
parseDateTimeBestEffortOrNull(timestamp)特殊值处理:
assumeNotNull(x) ;nullif(x, 'N/A');提交任务,等待一段时间。也可以点击 导入任务名称 > 执行 ID > 日志,查看导入留下的日志,如果有数据写入成功或失败了,则会留下日志。例如:
select sensorId, timestamp from sensor.heat_sensor_data_unique order by desc limit 10;
执行上述命令,如果已经可以查到数据,则表明导入成功了。