CREATE TABLE table_identifier ( col_name1 col_type1, col_name2 col_type2 ... )
通过 SQL 语句建表,除了指定的字段以外,默认会添加名为 ds 的分区字段,类型是 string
注意
例子
-- user 表 CREATE TABLE raw_user ( user_id STRING, name STRING, age INT, country STRING); -- item 表 CREATE TABLE raw_item ( item_id BIGINT, name STRING); -- 行为表 CREATE TABLE raw_bhv ( user_id STRING, item_id BIGINT, ts BIGINT, action INT);
ALTER TABLE table_identifier ADD COLUMNS ( col_name1 col_type1, col_name2 col_type2 ... )
例子
ALTER TABLE processed_bhv ADD COLUMNS (extra_info STRING);
批式 SQL 为大小写不敏感的 SQL
支持的基础类型为 bigint, int, float, double, string,以及基础类型的 array。
批式 SQL 支持 绝大多数的 Hive语法。
注意
插入数据时需要确保字段顺序与目标表定义的字段顺序完全一致。
例子
-- 简单INSERT INTO INSERT INTO raw_user VALUES ('user:1', "user:byteair", 23, 'China', '20220101'); -- 指定分区的INSERT INTO INSERT INTO raw_user PARTITION (ds='20220101') VALUES ('user:1', "user:byteair", 23, 'China'); -- 指定分区的INSERT OVERWRITE INSERT OVERWRITE raw_user PARTITION (ds='20220101') VALUES ('user:1', "user:byteair", 23, 'China'); INSERT INTO raw_item PARTITION (ds='20220101') VALUES(4671934, "item:byteair"); -- 将query出来的数据写入目标表中 -- processed_bhv的schema如下 -- CREATE TABLE processed_bhv ( -- user_id STRING, -- item_id STRING, -- ts BIGINT, -- action INT, -- req_id STRING); INSERT OVERWRITE processed_bhv partition(ds='20220101') SELECT user_id, item_id, ts, action, concat(user_id, item_id) AS req_id FROM raw_bhv WHERE ds = '20220101';
例子
-- 计算2022年1月1日当天所有行为数据中,各个item_id在20岁以上的人群中被访问的用户数,以及最大的用户年龄 SELECT count(DISTINCT user_id) AS user_count, max(age) AS max_user_age, bhv_item_id FROM ( SELECT user_id AS raw_user_id, age AS raw_user_age FROM raw_user WHERE ds = '20220101' ) JOIN ( SELECT user_id AS bhv_user_id, item_id AS bhv_item_id FROM raw_bhv WHERE ds = '20220101' ) ON raw_user_id = bhv_user_id WHERE raw_user_age > 20 GROUP BY bhv_item_id
目前批式 SQL 支持 Spark 的大部分 build-in 函数,详见文档 Spark Built-In Functions。
公共 UDF 函数
由平台方提供的开箱即用 UDF,使用时无需声明,直接像 Spark 内置函数一样使用即可,目前主要包含以下几种。
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
nlp_cut_words | 对一个string类型的字段使用CRF模型处理进行切词,根据中文语义切分成多个词组成的string数组 |
| 只能处理中文,多语言模型暂不支持 |
gauss_rank | 对输入数值进行 Gauss Rank 计算,用于数据的归一化、标准化 |
| 只支持数值型参数; |
array_log1p_int | 对输入数组进行 Log1p 计算 |
| 支持传入数值型数组参数 |
fnv_hash | 对输入字符串使用64-bit FNV-1a算法计算hash,最高位置0,返回long类型 |
| 字符串类型 |
例子
select nlp_cut_words(`doc_title`) as `doc_title_terms` from T; select gauss_rank(`goods_price`) as `goods_price_gauss_rank` from T; select array_log1p_int(array(1,2,3)); select fnv_hash("1");
流式 SQL 支持 Flink SQL 1.11 的标准语法和功能,但是在数据类型和部分SQL用法上有一定限制。
数据类型
典型的 Flink SQL 包含 Source 和 Sink 两部分的 DDL 语句,其中 Sink 部分的 DDL 仅支持以下基础类型 int, bigint, float, double, string 和对应的数组类型 array<int>
, array<bigint>
, array<float>
, array<double>
, array<string>
。
语法限制
不支持 DROP 操作。
保留关键字
与 Flink SQL 保持一致。
CREATE TABLE
Source 和 Sink 通过 CREATE TABLE DDL 来描述跟外部数据源的读取、写入配置信息。
其中 CREATE TABLE 语句的 with option 里必须显式配置 'connector' 和 'table-name' = 'xxx_table'
。
-- 一个典型的消费 kafka topic 的 Source DDL 定义示例 CREATE TABLE source_table_demo ( col_name1 int, col_name2 bigint ) WITH ( 'connector' = 'byte-mq', --必填字段 'table-name' = 'mq_item_source' -- 必填字段 );
键值类表需要显式指定主键字段,只用于sink 场景,使用 primary key (col_name) not enforced
来描述主键字段,如下所示。
-- 一个典型的写入 KV table 的 Sink DDL 定义示例 CREATE TABLE sink_table_demo ( col_name1 int, col_name2 bigint, `primary` `key (`col_name1`)` `not` `enforced` `-- KV 必须指定主键字段, 使用 not enforced 关键字` ) WITH ( 'connector' = 'byte-kv', 'table-name' = 'kv_item_table' );
添加新字段
目前只能通过表管理的界面进行新增字段操作。
CREATE VIEW
对于比较复杂的 SQL 计算逻辑,推荐使用 CREATE VIEW 来承载中间计算结果,便于复用数据,同时也让整个 SQL 语句结构清晰易懂。
CREATE VIEW view_name AS selectStatement;
Flink Built-in 函数
公共 UDF 函数
由平台方提供的开箱即用 UDF,使用时无需声明,直接像 Flink 内置函数一样使用即可,目前主要包含以下几种。
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
json_str_to_array | 将一个 JSON ARRAY 字符串转换为一个 ARRAY 返回,解析失败则返回为 null 或抛异常 |
| content 需要解析的 JSON ARRAY 对象 |
json_str_to_map | 将一个 JSON 对象字符串转换为一个 MAP<VARCHAR, VARCHAR> 返回,注意只提取 Top-level 层;解析失败则返回 null 或抛异常 |
| 同上 |
例子
select json_str_to_array(json_array_str, TRUE) as str_array from T; json_str_to_array('["10","20","[30,40]"]', TRUE) -> ["10","20","[30,40]"] select json_str_to_map(json_object_str, TRUE) as map_result from T; json_str_to_map('{"a":"1","b":{"c":"2"},"d":["3","4"]}', TRUE) -> {"a":"1", "b":"{"c":"2"}","d":"["3","4"]"}
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
split_str | 将一个 VARCHAR 字符串进行分割, 返回一个分割后的ARRAY |
| input 需要分割的对象(字符串) |
例子
SELECT split_str(input,'-',2) as result from T; split_str('cat.dog.fish', '\.') -> ['cat', 'dog', 'fish'] splitstr('a', '') -> ['a'] splitstr('a', '', -1) -> ['a', '']
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
concat_ws_array | 将一个 ARRAY 进行连接, 返回一个连接后的字符串 |
| delimiter 连接符 |
concat_array | 将一个 ARRAY 进行连接, 返回一个连接后的字符串, 连接符为空字符串"" |
| 同上 |
例子
SELECT concat_ws_array('-',input) as result from source; concat_wa_array('-', ['today', 'is' ,'a', 'good', 'day']) -> 'today-is-a-good-day' SELECT concat_array(input) as result from source; concat_array(['today', 'is' ,'a', 'good', 'day']) -> 'todayisagoodday'
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
string_array_contains | 注意数组中各元素以及目标元素均需为VARCHAR类型 |
| input 用来做判断的数组 |
int_array_contains | 注意数组中各元素以及目标元素均需为INT类型 |
| 同上 |
long_array_contains | 数组中各元素以及目标元素均需为LONG类型 |
| |
float_array_contains | 数组中各元素以及目标元素均需为FLOAT类型 |
| |
double_array_contains | 数组中各元素以及目标元素均需为DOUBLE类型 |
| |
array_log1p_int | 对输入数组进行 Log1p 计算 |
| 支持传入字符串数组参数,一般配合 json_str_to_array 使用 |
cast_array_data_type_double | 将输入数组元素转化为 Double |
| 支持传入字符串数组参数,一般配合 json_str_to_array 使用 |
cast_array_data_type_float | 将输入数组元素转化为 Float |
| 支持传入字符串数组参数,一般配合 json_str_to_array 使用 |
cast_array_data_type_long | 将输入数组元素转化为 Long |
| 支持传入字符串数组参数,一般配合 json_str_to_array 使用 |
cast_array_data_type_integer | 将输入数组元素转化为 Integer |
| 支持传入字符串数组参数,一般配合 json_str_to_array 使用 |
例子
SELECT string_array_contains(input, 'cat') as result FROM source; string_array_contains(['cat', 'dog', 'fish'], 'cat') -> true SELECT int_array_contains(input, 1) as result FROM source; int_array_contains(['1', '2', '3'], '1') -> true SELECT long_array_contains(input, 1L) as result FROM source; long_array_contains([1L, 2L, 3L], 1L) -> true SELECT float_array_contains(input, 1.23f) as result FROM source; float_array_contains([1.23f, 2.34f, 3.45f], 1.23f) -> true SELECT double_array_contains(input, 1.23) as result FROM source; double_array_contains([1.23, 2.34, 3.45], 1.23) -> true SELECT array_log1p_int(input) as result FROM source; array_log1p_int(ARRAY['1','2','3']) -> ARRAY[0,1,1] SELECT array_log1p_int(json_str_to_array(json_str)) as result FROM source; array_log1p_int(json_str_to_array('[1,2,3]')) -> ARRAY[0,1,1] SELECT cast_array_data_type_double(json_str_to_array(json_str)) as result FROM source; cast_array_data_type_double(json_str_to_array('[1,2,3]')) -> ARRAY[1.0,2.0,3.0] SELECT cast_array_data_type_long(json_str_to_array(json_str)) as result FROM source; cast_array_data_type_long(json_str_to_array('[1.1,2.1,3.1]')) -> ARRAY[1,2,3]
函数名称 | 函数功能 | 语法 | 入参说明 |
---|---|---|---|
nlp_cut_words | 对一个string类型的字段使用CRF模型处理进行切词,根据中文语义切分成多个词组成的string数组 |
| 只能处理中文,多语言模型暂不支持 |
例子
select nlp_cut_words(`doc_title`) as `doc_title_terms` from T;
name | meaning | required | default | consumer/producer | 备注 |
---|---|---|---|---|---|
connector | byte-mq | YES | consumer & producer | ||
table-name | 需要指定的 MQ 消息队列中间表名称 | YES | consumer & producer | ||
properties.group.id | consumer group id | YES | consumer | ||
format | 目前仅支持 json,必须填写 | YES | consumer & producer | ||
scan.manually-commit-offsets-interval | 是否自动commit offset,单位是时间类似'10 s' | NO | consumer | ||
scan.startup.mode | earliest-offset/latest-offset/group-offsets/specific-offsets/timestamp,详细解释 | NO | group-offsets | consumer | |
scan.partition-fields | source key-by的字段, 在数据发给下游之前, 会先对source做key-by | NO | 0 | consumer | |
scan.startup.timestamp-millis | 当指定scan.startup.mode为timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费 | NO | 0 | consumer | |
scan.reset-to-earliest-for-new-partition | 该参数表示在任务启动时,如果用的是group-offsets配置,且没有开启checkpoint或者开启checkpoint但checkpoint里面没有数据, 对于那些还没有offset的partition如何处理 | NO | false | consumer | |
properties.{param} | {param} 是任意 kafka 参数, 如 properties.ignore.dc.check = true | NO | consumer & producer | ||
parallelism | 并发度 | NO | -(kafka source 建议是 partition 个数) | consumer & producer | |
sink.partition-fields | 按 key hash 的列名,可以是多列,用逗号分隔。如 'sink.partition-fields' = 'id,name',按 id 和 name 的 hash 写 kafka partition | NO | consumer | ||
sink.log-failures-only | 写入 kafka 失败时是否只打一条日志,task 不退出 | NO | false | producer | |
scan.source-sample-interval | 对 kafka topic 进行采样的采样间隔,单位为消息条数,值为0时不开启采样。 | NO | 0 | consumer | |
scan.source-sample-num | 对 kafka topic 进行采样的采样数,单位为消息条数,若未配置采样间隔,则此配置不生效。 | NO | consumer | ||
scan.metadata-fields-mapping | 字段与元数据类型的映射字符串,每一对映射格式为{metadata_type}={field_name},映射之间采用逗号分隔,支持的metadata_type为timestamp、partition和offset(注意大小写),它们对应的数据类型都为bigint。 | NO | consumer | ||
rate-limit-num | 限速配置,整个作业的配置,比如1000000,表示整个算子的总QPS不超过100w,注意,这个限速会在各个subtask之间平均分配额度,考虑到数据均衡和quota计算的整除,限制的速度一定不要限制的太死 | NO | |||
sink.in-flight-batch-size-factor | 写入kafka时,in-flight数据最大的batch size因子,为kafka client参数“properties.batch.size”的 n 倍。 | NO | 0(不开启) | producer | |
sink.in-flight-max-num | 写入kafka时,in-flight数据最大消息条数。当Flink侧未被ack的数据条数超过此大小时,将限制写入的速度。 | NO | 0(不开启) | producer | 与batch-size同时配置时,取最小值来生效。 |
例子:读取一个消息队列表,经过 ETL 处理后输出到另外一个消息队列表。
CREATE TABLE raw_item_source ( `item_id` BIGINT, `tags` STRING, `cate` STRING, `cate_cnt` INT, `video_duration` INT, `receive_timestamp` BIGINT, `status` INT, `proc_time` AS PROCTIME() ) WITH ( 'connector' = 'byte-mq', 'table-name' = 'mq_item_topic', 'properties.group.id' = 'item_mq_demo', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'scan.manually-commit-offsets-interval' = '5000' ); CREATE TABLE temp_item_sink ( `item_id` BIGINT, `tags` STRING, `cate` STRING, `cate_cnt` INT, `video_duration` INT, `receive_timestamp` BIGINT, `status` INT ) WITH ( 'connector' = 'byte-mq', 'table-name' = 'mq_item_etl_topic', 'format' = 'json' ); CREATE VIEW `etl_view` AS SELECT item_id, tags, cate, cate_cnt, video_duration, receive_timestamp, status FROM raw_item_source WHERE status = 1 AND cate_cnt > 2; INSERT INTO temp_item_sink SELECT * FROM etl_view;
CREATE TABLE my_source ( `item_id` BIGINT, `video_type` STRING, `category` STRING, `item_name` STRING, `fake_id` STRING, `copyright_start` BIGINT, `copyright_end` BIGINT, `video_count` BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'byte-mq', 'table-name' = 'mq_item', 'properties.group.id' = 'item_mq_test_demo', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'scan.manually-commit-offsets-interval' = '5000' ); CREATE VIEW my_view AS SELECT `item_id`, concat(`video_type`, ' @ ', `category`) AS video_info, upper(`item_name`) AS item_name, rand_integer(10) AS intval, char_length(`fake_id`) AS cnt, (`copyright_end` - `copyright_start`) AS time_gap, sqrt(`video_count`) AS video_count FROM my_source; CREATE TABLE my_sink( `item_id` BIGINT, `video_info` STRING, `item_name` STRING, `intval` INT, `cnt` INT, `time_gap` BIGINT, `video_count` DOUBLE ) WITH ( 'connector' = 'byte-mq', 'table-name' = 'mq_demo_sink_topic', 'format' = 'json' ); INSERT INTO my_sink SELECT * FROM my_view WHERE time_gap > 0;
name | meaning | required | default |
---|---|---|---|
connector | byte-kv | YES | |
table-name | KV表名称 | YES | |
parallelism | 算子并发度,通常不需要设定 | NO | |
write_null | 该参数表示支持写入null到kv表,比如字段A第一次写入时是"abc",若第二次写入字段null,那么kv中原有的"abc"会更新成null。 | NO | false |
例子:读取一个消息队列表,经过 ETL 处理后,写入到键值类表。
CREATE TABLE raw_item_source ( `item_id` BIGINT, `tags` STRING, `cate` STRING, `cate_cnt` INT, `video_duration` INT, `receive_timestamp` BIGINT, `status` INT, `proc_time` AS PROCTIME() ) WITH ( 'connector' = 'byte-mq', 'table-name' = 'mq_item_topic', 'properties.group.id' = 'item_kv_demo_group', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'scan.manually-commit-offsets-interval' = '5000' ); CREATE TABLE item_feature_table ( `item_id` BIGINT, `doc_cate1` STRING, `doc_cate2` STRING, `doc_cate3` STRING, `doc_tags` ARRAY<STRING>, `doc_cate_cnt` INT, `doc_video_duration_10` INT, `doc_receive_timestamp` BIGINT, primary key(item_id) NOT enforced ) WITH ('connector' = 'byte-kv', 'table-name' = 'item_kv_feature_table'); CREATE VIEW kv_view AS SELECT doc_id, split_index(cate, '#', 0) AS doc_cate1, split_index(cate, '#', 1) AS doc_cate2, split_index(cate, '#', 2) AS doc_cate3, ARRAY['游戏', 'switch', '塞尔达', '林达'] AS tags, cate_cnt AS doc_cate_cnt, CAST(log10(video_duration) AS INT) AS doc_video_duration_10, receive_timestamp AS doc_receive_timestamp FROM raw_item_source WHERE status = 1; INSERT INTO item_feature_table SELECT * FROM kv_view;