在大数据流处理领域,Apache Flink 因其高吞吐、低延迟的特性被广泛应用于实时 ETL、事件驱动型应用等场景。在这些场景中,JSON 作为一种极其常见的数据交换格式,其解析效率直接决定了整个流处理任务的性能。
社区版本的 Flink 在每次调用 JSON_VALUE 等函数时,都会对传入的 JSON 字符串进行一次完整的解析、遍历和转换操作。当一条 SQL 语句中需要从同一个 JSON 字符串里提取多个值,或者数据吞吐量极高时,这种反复解析的操作会带来巨大的 CPU 开销,成为性能瓶颈。
为了优化这一痛点,流式计算 Flink 版为 SQL/Table API 中的 JSON 函数引入了值缓存复用机制。通过在 TaskManager 环境中启用 JSON_VALUE_CACHE_ENABLED 参数,可以显著提升 JSON_VALUE, JSON_QUERY ,JSON_EXISTS 等函数的解析性能。
此功能专门优化在 Flink SQL 作业中多次处理同一个 JSON 字符串的场景。它通过在 TaskManager 级别缓存已解析的 JSON 结果,使得对同一字符串的后续提取操作可以避免重复解析,从而大幅降低 CPU 使用率并提升吞吐量。
典型场景示例:
假设您的数据源(如 Kafka Topic)中的每条消息都包含一个复杂的 JSON 字符串 content 字段,其结构如下:
{ "user": { "id": 12345, "name": "Alice", "device": { "type": "smartphone", "os": "iOS", "version": "15.4" } }, "event": { "type": "click", "timestamp": 1678882461000, "page": "/home" } }
在 Flink SQL 中,您可能需要在一个 INSERT INTO 语句中,从这一个 content 字段里提取出多个不同的值,写入到下游表中。
未开启优化时的 SQL(性能低下):
INSERT INTO user_events SELECT JSON_VALUE(content, '$.user.id') AS user_id, -- 第一次完整解析 content JSON_VALUE(content, '$.user.name') AS user_name, -- 第二次完整解析 content JSON_VALUE(content, '$.user.device.os') AS os, -- 第三次完整解析 content JSON_VALUE(content, '$.event.type') AS event_type, -- 第四次完整解析 content JSON_VALUE(content, '$.event.timestamp') AS `timestamp` -- 第五次完整解析 content FROM source_table;
在上面的语句中,尽管所有字段都来自同一条消息的同一个 content 字段,但 JSON_VALUE 函数会被调用 5 次,每次都会对同一个 JSON 字符串进行完整的重新解析,造成了大量的重复计算。
开启优化后的效果:
当设置 containerized.taskmanager.env.JSON_VALUE_CACHE_ENABLED: true 后,Flink 会为当前处理的这条消息的 content 字符串对象创建解析缓存。
JSON_VALUE(content, '$.user.id'):Flink 会正常解析整个 JSON 字符串,提取出 user_id,并将解析结果缓存起来(与原始字符串对象关联)。content 字符串对象时,Flink 会直接从缓存中获取已经解析好的结果,然后仅仅执行提取(XPath查询)操作,从而完全避免了重复的解析开销。因此,此功能对于在 SQL 中频繁使用 JSON 函数从单一字段提取多个值的场景,优化效果极为显著。
重要说明:
- 适用范围:此优化仅对 Flink SQL/Table API 中的
JSON_VALUE,JSON_QUERY,JSON_EXISTS等内置 JSON 函数有效。- 缓存机制:缓存依赖于原始 JSON 字符串对象的身份(Identity),而非内容(Content)。这意味着它对于处理同一消息对象中的同一字段多次访问的场景效果最佳。如果您在SQL中使用了常量字符串或内容相同但对象不同的字符串,则可能无法命中缓存。
需要在任务的自定义参数中增加如下配置项,即可开启 JSON 解析复用的能力
containerized.taskmanager.env.JSON_VALUE_CACHE_ENABLED : true
配置效果如下: