You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Formats 参考
JSON
复制全文
JSON

JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。

如何创建一张基于 JSON Format 的表

以下是一个利用 Kafka 以及 JSON Format 构建表的例子。

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

Format 参数

参数

是否必须

默认值

类型

描述

format

必选

(none)

String

声明使用的格式,这里应为'json'

json.fail-on-missing-field

可选

false

Boolean

当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。

json.ignore-parse-errors

可选

false

Boolean

当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

json.timestamp-format.standard

可选

'SQL'

String

声明输入和输出的 TIMESTAMPTIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601'

  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123",以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" ,以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。

json.map-null-key.mode

注意:仅适用于 Flink1.16 版本

选填

'FAIL'

String

指定处理 Map 中 key 值为空的方法。。当前支持的值有 'FAIL', 'DROP''LITERAL':

  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。

json.map-null-key.literal

注意:仅适用于 Flink1.16 版本

选填

'null'

String

'json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

json.encode.decimal-as-plain-number

注意:仅适用于 Flink1.16 版本

选填

false

Boolean

将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

数据类型映射关系

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 数据类型

JSON 数据类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

string with encoding: base64

DECIMAL

number

TINYINT

number

SMALLINT

number

INT

number

BIGINT

number

FLOAT

number

DOUBLE

number

DATE

string with format: date

TIME

string with format: time

TIMESTAMP

string with format: date-time

TIMESTAMP_WITH_LOCAL_TIME_ZONE

string with format: date-time (with UTC time zone)

INTERVAL

number

ARRAY

array

MAP / MULTISET

object

ROW

object

注意事项

  • JSON 类型会依据用户在 create table 中声明的列名(column name)和列类型(column type)进行解析。当前的 SQL 任务仅实现了部分的自动适配。例如,声明为 int 类型,但实际在 JSON 中是 int 的字符串表示形式,SQL 能够自动识别并转换回 int 类型。然而,如果声明为 int 但实际为 long 类型,或者声明为 int 但实际是带有字母的字符串等情况,SQL 无法直接转换,会报错。
  • JSON 类型支持嵌套。例如,{"a": "a", "b": {"c": 1, "d": "s"}} ,在声明嵌套结构时,需要声明为 b Row<c int, d varchar> ,这样就表明字段 cd 是嵌套于字段 b 中的。JSON 也支持数组形式。
    • 如果是简单的数组,例如 "a": [1,2,3,4] ,则可以声明为 a Array ,这意味着 a 是一个 int 类型的数组。需要注意的是,在进行 select 操作时,下标是从 1 开始的。例如 "a": [6,7,8,9] ,执行 select a[1] 会返回 6 。
    • 如果是嵌套 JSON 的数组,例如 "a": [{"b": 3}, {"b": 6}] ,则可以声明为 a Array<Row> ,这表明 a 是一个对象类型的数组,a 的结构中,有一个类型为 intb 列。获取数据的方式如 select a[1].b 会返回 3 。

常见问题

  • 反序列化时出现报错 Invalid UTF-8 start byte

我们内部的实现使用的是 Jackson,在反序列化方面仅支持 UTF-8、UTF-16 以及 UTF-32 这几种编码方式,若使用其他编码方式就会报上述错误。同时,打出的异常消息会存在乱码(以 UTF-8 方式转换成字符串打出)。
如果能够忽略此类消息,可以配置 json.ignore-parse-errors来忽略掉这类消息;如果必须要解析此类消息,可以指定format格式为 bytes,自行编写 UDF(用户自定义函数)进行解析。

最近更新时间:2024.08.20 17:38:32
这个页面对您有帮助吗?
有用
有用
无用
无用