本文档描述如何通过火山引擎 TOS(对象存储)的事件通知功能,将对象创建、更新等事件推送至 Kafka,再通过 Flink SQL 实时消费 Kafka 中的 TOS 事件消息,完成事件解析与处理,并将结果写入下游 Kafka Topic,实现端到端的 TOS 事件驱动数据管道。这个技术方案对于处理多模态数据、IoT 数据等非常有用。
tos-object-eventsObjectCreated:Put、ObjectCreated:Post、ObjectCreated:CompleteMultipartUpload 等。tos-event-source。参考文档:设置事件通知推送至 Kafka
确保以下信息已就绪:
配置项 | 说明 | 示例值 |
|---|---|---|
Kafka Bootstrap Servers | Kafka 集群连接地址 |
|
源 Topic | TOS 事件通知推送的 Topic |
|
目标 Topic | Flink 处理后写入的 Topic |
|
Consumer Group | Flink 消费组 ID |
|
TOS 推送到 Kafka 的事件消息为 JSON 格式,结构示例如下:
{ "events": [ { "eventName": "tos:ObjectCreated:Put", "eventSource": "tos", "eventTime": "2026-05-01T16:27:24Z", "eventVersion": "1.0", "tos": { "bucket": { "trn": "trn:tos:::bucket-name", "name": "bucket-name", "ownerIdentify": "2********7" }, "object": { "eTag": "\"0b6b2168c4c9a67d92c5eb7012c5c6c6\"", "key": "paimon_las/paimon_db.db/kafka_dump/snapshot/.snapshot-10392.tmp", "size": 555 }, "tosSchemaVersion": "1.0", "ruleId": "events", "region": "cn-guangzhou", "requestParameters": { "sourceIPAddress": "127.0.0.1:41690" }, "responseElements": { "requestId": "4c0a07f4dxxxxxxxxxx-b59ea2c" }, "userIdentity": { "principalId": "trn:iam::2********7:root" } } } ] }
核心字段说明:
字段 | 说明 |
|---|---|
| 事件类型,如 |
| 事件发生时间(UTC) |
| 存储桶名称 |
| 对象 Key(路径) |
| 对象大小(字节) |
| 存储桶所在地域 |
| 触发的事件通知规则 ID |
本文提供常用的两种实现方案:RAW + JSON_VALUE 与 JSON Format + ROW。两种方案在 Schema 适配、类型约束、查询易用性、迭代成本及查询性能上存在明显差异。下面从多维度进行横向对比,并给出落地选型建议,方便根据业务场景选择最优解析方案。
维度 | 方案一:RAW + JSON_VALUE | 方案二:JSON Format + ROW |
|---|---|---|
Schema 灵活性 | 高,按需取字段,无需预定义完整结构 | 低,需预定义完整嵌套 Schema |
类型安全 | 弱,依赖运行时 CAST 转换 | 强,建表即约束类型 |
查询可读性 | JsonPath 表达式较长 | 点号访问,直观简洁 |
新增字段成本 | 仅需修改查询 SQL | 需 ALTER TABLE 或重建源表 |
反序列化性能 | 每行多次 JSON 解析(字段越多开销越大) | 一次完整反序列化,字段访问零开销 |
适用场景 | 事件字段不固定、快速迭代、仅关注少量字段 | Schema 稳定、字段使用率高、对性能敏感 |
选型建议:
核心思路:以 RAW 格式将整条 Kafka 消息作为字符串读入,使用 JSON_VALUE 通过 JsonPath 下标直接访问 events[0] 中的各字段。TOS 事件通知每条 Kafka 消息的 events 数组通常只包含 1 个元素,因此直接用下标访问即可。
-- 源表:以 RAW 格式从 Kafka 读取原始 JSON 字符串 CREATE TABLE tos_event_source ( raw_message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-source', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'properties.group.id' = 'flink-tos-event-consumer', 'scan.startup.mode' = 'latest-offset', 'format' = 'raw' );
-- 目标表:将解析后的事件写入下游 Kafka CREATE TABLE tos_event_sink ( event_name STRING, event_time STRING, bucket_name STRING, object_key STRING, object_size BIGINT, object_etag STRING, region STRING, rule_id STRING, request_id STRING, source_ip STRING, principal_id STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-sink', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'format' = 'json' );
-- 直接通过 JsonPath 下标 events[0] 访问,兼容火山 Flink 1.17 INSERT INTO tos_event_sink SELECT JSON_VALUE(raw_message, '$.events[0].eventName') AS event_name, JSON_VALUE(raw_message, '$.events[0].eventTime') AS event_time, JSON_VALUE(raw_message, '$.events[0].tos.bucket.name') AS bucket_name, JSON_VALUE(raw_message, '$.events[0].tos.object.key') AS object_key, CAST(JSON_VALUE(raw_message, '$.events[0].tos.object.size') AS BIGINT) AS object_size, JSON_VALUE(raw_message, '$.events[0].tos.object.eTag') AS object_etag, JSON_VALUE(raw_message, '$.events[0].tos.region') AS region, JSON_VALUE(raw_message, '$.events[0].tos.ruleId') AS rule_id, JSON_VALUE(raw_message, '$.events[0].tos.responseElements.requestId') AS request_id, JSON_VALUE(raw_message, '$.events[0].tos.requestParameters.sourceIPAddress') AS source_ip, JSON_VALUE(raw_message, '$.events[0].tos.userIdentity.principalId') AS principal_id FROM tos_event_source;
说明
TOS 事件通知每条 Kafka 消息的 events 数组通常只有 1 个元素,直接用 $.events[0] 下标访问即可。
另外,如果采用此种方案,建议开启 JSON 加速能力,可以参考 JSON 函数性能优化。
核心思路:在源表定义时直接用 ARRAY<ROW<...>> 声明完整嵌套 Schema,Flink 自动将 JSON 反序列化为结构化类型,查询时通过点号访问嵌套字段。
-- 源表:声明完整嵌套 Schema,Flink 自动反序列化 CREATE TABLE tos_event_typed ( events ARRAY<ROW< eventName STRING, eventTime STRING, eventSource STRING, tos ROW< bucket ROW<name STRING, trn STRING, ownerIdentify STRING>, object ROW<`key` STRING, size BIGINT, eTag STRING>, region STRING, ruleId STRING, tosSchemaVersion STRING, requestParameters ROW<sourceIPAddress STRING>, responseElements ROW<requestId STRING>, userIdentity ROW<principalId STRING> > >> ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-source', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'properties.group.id' = 'flink-tos-event-consumer', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' );
CREATE TABLE tos_event_sink ( event_name STRING, event_time STRING, bucket_name STRING, object_key STRING, object_size BIGINT, object_etag STRING, region STRING, rule_id STRING, request_id STRING, source_ip STRING, principal_id STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-sink', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'format' = 'json' );
INSERT INTO tos_event_sink SELECT t.eventName AS event_name, t.eventTime AS event_time, t.tos.bucket.name AS bucket_name, t.tos.object.`key` AS object_key, t.tos.object.size AS object_size, t.tos.object.eTag AS object_etag, t.tos.region AS region, t.tos.ruleId AS rule_id, t.tos.responseElements.requestId AS request_id, t.tos.requestParameters.sourceIPAddress AS source_ip, t.tos.userIdentity.principalId AS principal_id FROM tos_event_typed CROSS JOIN UNNEST(events) AS t(eventName, eventTime, eventSource, tos);
注意
UNNEST 展开 ARRAY<ROW<...>> 时,alias 列表数量必须与 ROW 的顶层字段数一致(此处为 4 个:eventName, eventTime, eventSource, tos),不能只写一个别名。查询中通过 t.字段名 访问各列。
将方案一整合为一个可直接提交的 Flink SQL 作业脚本:
-- ============================================================ -- TOS 事件订阅 Flink SQL 端到端示例(方案一:RAW + JSON_VALUE) -- 链路:TOS 事件 -> Kafka (source) -> Flink SQL -> Kafka (sink) -- ============================================================ -- Step 1: 定义源表 CREATE TABLE tos_event_source ( raw_message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-source', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'properties.group.id' = 'flink-tos-event-consumer', 'scan.startup.mode' = 'latest-offset', 'format' = 'raw' ); -- Step 2: 定义目标表 CREATE TABLE tos_event_sink ( event_name STRING, event_time STRING, bucket_name STRING, object_key STRING, object_size BIGINT, object_etag STRING, region STRING, rule_id STRING, request_id STRING, source_ip STRING, principal_id STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'tos-event-sink', 'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092', 'format' = 'json' ); -- Step 3: 解析事件并写入 INSERT INTO tos_event_sink SELECT JSON_VALUE(raw_message, '$.events[0].eventName') AS event_name, JSON_VALUE(raw_message, '$.events[0].eventTime') AS event_time, JSON_VALUE(raw_message, '$.events[0].tos.bucket.name') AS bucket_name, JSON_VALUE(raw_message, '$.events[0].tos.object.key') AS object_key, CAST(JSON_VALUE(raw_message, '$.events[0].tos.object.size') AS BIGINT) AS object_size, JSON_VALUE(raw_message, '$.events[0].tos.object.eTag') AS object_etag, JSON_VALUE(raw_message, '$.events[0].tos.region') AS region, JSON_VALUE(raw_message, '$.events[0].tos.ruleId') AS rule_id, JSON_VALUE(raw_message, '$.events[0].tos.responseElements.requestId') AS request_id, JSON_VALUE(raw_message, '$.events[0].tos.requestParameters.sourceIPAddress') AS source_ip, JSON_VALUE(raw_message, '$.events[0].tos.userIdentity.principalId') AS principal_id FROM tos_event_source;
JSON 解析兼容性:JSON_VALUE 是 Flink 1.15+ 内置函数,火山 Flink 1.17 已支持。
UNNEST 展开数组:方案二中 UNNEST(events) 展开 ROW 类型时,alias 列表数量必须与 ROW 顶层字段数一致(如 4 个字段则写 4 个 alias)。方案一无需 UNNEST。
Kafka 认证:如 Kafka 开启了 SASL 认证,需在 WITH 参数中补充 'properties.security.protocol'、'properties.sasl.mechanism'、'properties.sasl.jaas.config' 等配置项。
事件过滤:如需仅处理特定事件类型(如 ObjectCreated),可在 INSERT 语句中添加 WHERE 条件:
-- 方案一过滤写法 WHERE JSON_VALUE(raw_message, '$.events[0].eventName') LIKE 'tos:ObjectCreated:%' -- 方案二过滤写法 WHERE t.eventName LIKE 'tos:ObjectCreated:%'
时间语义:如需基于事件时间进行窗口聚合,可将 event_time 转为 TIMESTAMP 类型并定义 Watermark。
ROW 类型中的保留字:方案二中 object.key 的 key 是 SQL 保留字,需用反引号转义。