You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
TOS 对象存储
Flink 订阅 TOS 对象存储事件方案
复制全文
下载 pdf
Flink 订阅 TOS 对象存储事件方案

本文档描述如何通过火山引擎 TOS(对象存储)的事件通知功能,将对象创建、更新等事件推送至 Kafka,再通过 Flink SQL 实时消费 Kafka 中的 TOS 事件消息,完成事件解析与处理,并将结果写入下游 Kafka Topic,实现端到端的 TOS 事件驱动数据管道。这个技术方案对于处理多模态数据、IoT 数据等非常有用。

1 整体数据链路

Image

2 准备工作

2.1 开启 TOS 事件通知并推送至 Kafka

  1. 登录火山引擎控制台,进入对象存储 TOS 服务
  2. 选择目标存储桶,进入 基础设置 > 事件通知
  3. 点击 创建事件通知规则,配置以下信息:
    • 规则名称:自定义,例如 tos-object-events
    • 事件类型:勾选需要监听的事件,如 ObjectCreated:PutObjectCreated:PostObjectCreated:CompleteMultipartUpload 等。
    • 前缀/后缀过滤(可选):按需填写对象 Key 前缀或后缀过滤条件。
    • 推送目标:选择 Kafka
    • Kafka 实例:选择已有的火山引擎 Kafka 实例。
    • Topic:指定接收事件消息的 Kafka Topic,例如 tos-event-source
  4. 点击 确认 保存规则。

参考文档:设置事件通知推送至 Kafka

2.2 确认 Kafka 集群信息

确保以下信息已就绪:

配置项

说明

示例值

Kafka Bootstrap Servers

Kafka 集群连接地址

kafka-host1:9092,kafka-host2:9092

源 Topic

TOS 事件通知推送的 Topic

tos-event-source

目标 Topic

Flink 处理后写入的 Topic

tos-event-sink

Consumer Group

Flink 消费组 ID

flink-tos-event-consumer

  • 确保已开通火山引擎 Flink 服务(流式计算 Flink 版)。
  • Flink 版本需支持 Kafka Connector(火山 Flink 默认内置)。
  • 确保 Flink 集群网络可访问 Kafka 集群。

3 TOS 事件消息格式

TOS 推送到 Kafka 的事件消息为 JSON 格式,结构示例如下:

{
  "events": [
    {
      "eventName": "tos:ObjectCreated:Put",
      "eventSource": "tos",
      "eventTime": "2026-05-01T16:27:24Z",
      "eventVersion": "1.0",
      "tos": {
        "bucket": {
          "trn": "trn:tos:::bucket-name",
          "name": "bucket-name",
          "ownerIdentify": "2********7"
        },
        "object": {
          "eTag": "\"0b6b2168c4c9a67d92c5eb7012c5c6c6\"",
          "key": "paimon_las/paimon_db.db/kafka_dump/snapshot/.snapshot-10392.tmp",
          "size": 555
        },
        "tosSchemaVersion": "1.0",
        "ruleId": "events",
        "region": "cn-guangzhou",
        "requestParameters": {
          "sourceIPAddress": "127.0.0.1:41690"
        },
        "responseElements": {
          "requestId": "4c0a07f4dxxxxxxxxxx-b59ea2c"
        },
        "userIdentity": {
          "principalId": "trn:iam::2********7:root"
        }
      }
    }
  ]
}

核心字段说明:

字段

说明

eventName

事件类型,如 tos:ObjectCreated:Put

eventTime

事件发生时间(UTC)

tos.bucket.name

存储桶名称

tos.object.key

对象 Key(路径)

tos.object.size

对象大小(字节)

tos.region

存储桶所在地域

tos.ruleId

触发的事件通知规则 ID

4 操作步骤

本文提供常用的两种实现方案:RAW + JSON_VALUEJSON Format + ROW。两种方案在 Schema 适配、类型约束、查询易用性、迭代成本及查询性能上存在明显差异。下面从多维度进行横向对比,并给出落地选型建议,方便根据业务场景选择最优解析方案。

4.1 方案对比与选型建议

维度

方案一:RAW + JSON_VALUE

方案二:JSON Format + ROW

Schema 灵活性

高,按需取字段,无需预定义完整结构

低,需预定义完整嵌套 Schema

类型安全

弱,依赖运行时 CAST 转换

强,建表即约束类型

查询可读性

JsonPath 表达式较长

点号访问,直观简洁

新增字段成本

仅需修改查询 SQL

需 ALTER TABLE 或重建源表

反序列化性能

每行多次 JSON 解析(字段越多开销越大)

一次完整反序列化,字段访问零开销

适用场景

事件字段不固定、快速迭代、仅关注少量字段

Schema 稳定、字段使用率高、对性能敏感

选型建议:

  • 如果 TOS 事件类型多样,或只需提取少量字段做过滤/路由,优先采用方案一
  • 如果 Schema 已稳定,需提取大部分字段且对吞吐敏感,优先采用方案二。

4.2 方案一:RAW + JSON_QUERY + UNNEST(推荐)

核心思路:以 RAW 格式将整条 Kafka 消息作为字符串读入,使用 JSON_VALUE 通过 JsonPath 下标直接访问 events[0] 中的各字段。TOS 事件通知每条 Kafka 消息的 events 数组通常只包含 1 个元素,因此直接用下标访问即可。

源表定义

-- 源表:以 RAW 格式从 Kafka 读取原始 JSON 字符串
CREATE TABLE tos_event_source (
    raw_message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-source',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'properties.group.id' = 'flink-tos-event-consumer',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'raw'
);

目标表定义

-- 目标表:将解析后的事件写入下游 Kafka
CREATE TABLE tos_event_sink (
    event_name STRING,
    event_time STRING,
    bucket_name STRING,
    object_key STRING,
    object_size BIGINT,
    object_etag STRING,
    region STRING,
    rule_id STRING,
    request_id STRING,
    source_ip STRING,
    principal_id STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-sink',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'format' = 'json'
);

解析与写入

-- 直接通过 JsonPath 下标 events[0] 访问,兼容火山 Flink 1.17
INSERT INTO tos_event_sink
SELECT
    JSON_VALUE(raw_message, '$.events[0].eventName')             AS event_name,
    JSON_VALUE(raw_message, '$.events[0].eventTime')             AS event_time,
    JSON_VALUE(raw_message, '$.events[0].tos.bucket.name')       AS bucket_name,
    JSON_VALUE(raw_message, '$.events[0].tos.object.key')        AS object_key,
    CAST(JSON_VALUE(raw_message, '$.events[0].tos.object.size') AS BIGINT) AS object_size,
    JSON_VALUE(raw_message, '$.events[0].tos.object.eTag')       AS object_etag,
    JSON_VALUE(raw_message, '$.events[0].tos.region')            AS region,
    JSON_VALUE(raw_message, '$.events[0].tos.ruleId')            AS rule_id,
    JSON_VALUE(raw_message, '$.events[0].tos.responseElements.requestId') AS request_id,
    JSON_VALUE(raw_message, '$.events[0].tos.requestParameters.sourceIPAddress') AS source_ip,
    JSON_VALUE(raw_message, '$.events[0].tos.userIdentity.principalId') AS principal_id
FROM tos_event_source;

说明

TOS 事件通知每条 Kafka 消息的 events 数组通常只有 1 个元素,直接用 $.events[0] 下标访问即可。

另外,如果采用此种方案,建议开启 JSON 加速能力,可以参考 JSON 函数性能优化

4.3 方案二:JSON Format + ROW 嵌套类型声明

核心思路:在源表定义时直接用 ARRAY<ROW<...>> 声明完整嵌套 Schema,Flink 自动将 JSON 反序列化为结构化类型,查询时通过点号访问嵌套字段。

源表定义(带完整嵌套 Schema)

-- 源表:声明完整嵌套 Schema,Flink 自动反序列化
CREATE TABLE tos_event_typed (
    events ARRAY<ROW<
        eventName STRING,
        eventTime STRING,
        eventSource STRING,
        tos ROW<
            bucket ROW<name STRING, trn STRING, ownerIdentify STRING>,
            object ROW<`key` STRING, size BIGINT, eTag STRING>,
            region STRING,
            ruleId STRING,
            tosSchemaVersion STRING,
            requestParameters ROW<sourceIPAddress STRING>,
            responseElements ROW<requestId STRING>,
            userIdentity ROW<principalId STRING>
        >
    >>
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-source',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'properties.group.id' = 'flink-tos-event-consumer',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

目标表定义(同方案一)

CREATE TABLE tos_event_sink (
    event_name STRING,
    event_time STRING,
    bucket_name STRING,
    object_key STRING,
    object_size BIGINT,
    object_etag STRING,
    region STRING,
    rule_id STRING,
    request_id STRING,
    source_ip STRING,
    principal_id STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-sink',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'format' = 'json'
);

解析与写入

INSERT INTO tos_event_sink
SELECT
    t.eventName                              AS event_name,
    t.eventTime                              AS event_time,
    t.tos.bucket.name                        AS bucket_name,
    t.tos.object.`key`                       AS object_key,
    t.tos.object.size                        AS object_size,
    t.tos.object.eTag                        AS object_etag,
    t.tos.region                             AS region,
    t.tos.ruleId                             AS rule_id,
    t.tos.responseElements.requestId         AS request_id,
    t.tos.requestParameters.sourceIPAddress  AS source_ip,
    t.tos.userIdentity.principalId           AS principal_id
FROM tos_event_typed
CROSS JOIN UNNEST(events) AS t(eventName, eventTime, eventSource, tos);

注意

UNNEST 展开 ARRAY<ROW<...>> 时,alias 列表数量必须与 ROW 的顶层字段数一致(此处为 4 个:eventName, eventTime, eventSource, tos),不能只写一个别名。查询中通过 t.字段名 访问各列。

4.4 完整端到端示例(方案一合并脚本)

将方案一整合为一个可直接提交的 Flink SQL 作业脚本:

-- ============================================================
-- TOS 事件订阅 Flink SQL 端到端示例(方案一:RAW + JSON_VALUE)
-- 链路:TOS 事件 -> Kafka (source) -> Flink SQL -> Kafka (sink)
-- ============================================================

-- Step 1: 定义源表
CREATE TABLE tos_event_source (
    raw_message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-source',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'properties.group.id' = 'flink-tos-event-consumer',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'raw'
);

-- Step 2: 定义目标表
CREATE TABLE tos_event_sink (
    event_name STRING,
    event_time STRING,
    bucket_name STRING,
    object_key STRING,
    object_size BIGINT,
    object_etag STRING,
    region STRING,
    rule_id STRING,
    request_id STRING,
    source_ip STRING,
    principal_id STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'tos-event-sink',
    'properties.bootstrap.servers' = 'kafka-host1:9092,kafka-host2:9092',
    'format' = 'json'
);

-- Step 3: 解析事件并写入
INSERT INTO tos_event_sink
SELECT
    JSON_VALUE(raw_message, '$.events[0].eventName')             AS event_name,
    JSON_VALUE(raw_message, '$.events[0].eventTime')             AS event_time,
    JSON_VALUE(raw_message, '$.events[0].tos.bucket.name')       AS bucket_name,
    JSON_VALUE(raw_message, '$.events[0].tos.object.key')        AS object_key,
    CAST(JSON_VALUE(raw_message, '$.events[0].tos.object.size') AS BIGINT) AS object_size,
    JSON_VALUE(raw_message, '$.events[0].tos.object.eTag')       AS object_etag,
    JSON_VALUE(raw_message, '$.events[0].tos.region')            AS region,
    JSON_VALUE(raw_message, '$.events[0].tos.ruleId')            AS rule_id,
    JSON_VALUE(raw_message, '$.events[0].tos.responseElements.requestId') AS request_id,
    JSON_VALUE(raw_message, '$.events[0].tos.requestParameters.sourceIPAddress') AS source_ip,
    JSON_VALUE(raw_message, '$.events[0].tos.userIdentity.principalId') AS principal_id
FROM tos_event_source;

4.5 注意事项

  1. JSON 解析兼容性JSON_VALUE 是 Flink 1.15+ 内置函数,火山 Flink 1.17 已支持。

  2. UNNEST 展开数组:方案二中 UNNEST(events) 展开 ROW 类型时,alias 列表数量必须与 ROW 顶层字段数一致(如 4 个字段则写 4 个 alias)。方案一无需 UNNEST。

  3. Kafka 认证:如 Kafka 开启了 SASL 认证,需在 WITH 参数中补充 'properties.security.protocol''properties.sasl.mechanism''properties.sasl.jaas.config' 等配置项。

  4. 事件过滤:如需仅处理特定事件类型(如 ObjectCreated),可在 INSERT 语句中添加 WHERE 条件:

    -- 方案一过滤写法
    WHERE JSON_VALUE(raw_message, '$.events[0].eventName') LIKE 'tos:ObjectCreated:%'
    -- 方案二过滤写法
    WHERE t.eventName LIKE 'tos:ObjectCreated:%'
    
  5. 时间语义:如需基于事件时间进行窗口聚合,可将 event_time 转为 TIMESTAMP 类型并定义 Watermark。

  6. ROW 类型中的保留字:方案二中 object.keykey 是 SQL 保留字,需用反引号转义。

最近更新时间:2026.05.08 20:15:39
这个页面对您有帮助吗?
有用
有用
无用
无用