You need to enable JavaScript to run this app.
导航
SQL 语法说明
最近更新时间:2024.03.26 17:11:56首次发布时间:2022.10.09 14:06:09
我的收藏
有用
有用
无用
无用

新建表-SQL配置

DDL 建表

CREATE TABLE table_identifier
    ( col_name1 col_type1, col_name2 col_type2 ... )

通过 SQL 语句建表,除了指定的字段以外,默认会添加名为 ds 的分区字段,类型是 string

注意

  1. 新建表的字段中至少包含 user 维度和 item 维度的主键中的一个
  2. user 维度主键字段名为 user_id,其类型必须为 string
  3. item 维度的主键字段类型类型必须为 bigint,item 维度的主键字段名可以是 item_id, goods_id 和 doc_id 中的一个。

例子

-- user 表
CREATE TABLE raw_user (
    user_id STRING, 
    name STRING, 
    age INT, 
    country STRING);

-- item 表
CREATE TABLE raw_item (
    item_id BIGINT, 
    name STRING);

-- 行为表
CREATE TABLE raw_bhv (
    user_id STRING, 
    item_id BIGINT, 
    ts BIGINT, 
    action INT);

添加字段

ALTER TABLE table_identifier ADD COLUMNS 
    ( col_name1 col_type1, col_name2 col_type2 ... )

例子

ALTER TABLE processed_bhv ADD COLUMNS
    (extra_info STRING);

新建任务-批式SQL

支持语法

批式 SQL 为大小写不敏感的 SQL
支持的基础类型为 bigint, int, float, double, string,以及基础类型的 array。

DML

批式 SQL 支持 绝大多数的 Hive语法。

INSERT

注意

插入数据时需要确保字段顺序与目标表定义的字段顺序完全一致。

例子

-- 简单INSERT INTO
INSERT INTO raw_user VALUES ('user:1', "user:byteair", 23, 'China', '20220101');

-- 指定分区的INSERT INTO
INSERT INTO raw_user PARTITION (ds='20220101') 
    VALUES ('user:1', "user:byteair", 23, 'China');

-- 指定分区的INSERT OVERWRITE
INSERT OVERWRITE raw_user PARTITION (ds='20220101') 
    VALUES ('user:1', "user:byteair", 23, 'China');
    
INSERT INTO raw_item PARTITION (ds='20220101') 
    VALUES(4671934, "item:byteair");

-- 将query出来的数据写入目标表中
-- processed_bhv的schema如下
-- CREATE TABLE processed_bhv (
--    user_id STRING, 
--    item_id STRING, 
--    ts BIGINT, 
--    action INT, 
--    req_id STRING);
INSERT OVERWRITE processed_bhv  partition(ds='20220101') 
SELECT 
    user_id,
    item_id,
    ts,
    action,
    concat(user_id, item_id) AS req_id
FROM raw_bhv
WHERE ds = '20220101';

QUERY

例子

-- 计算2022年1月1日当天所有行为数据中,各个item_id在20岁以上的人群中被访问的用户数,以及最大的用户年龄
SELECT count(DISTINCT user_id) AS user_count, max(age) AS max_user_age, bhv_item_id
FROM 
    (
        SELECT user_id AS raw_user_id, age AS raw_user_age 
        FROM raw_user
        WHERE ds = '20220101'
    ) JOIN (
        SELECT user_id AS bhv_user_id, item_id AS bhv_item_id
        FROM raw_bhv
        WHERE ds = '20220101'
    )
    ON raw_user_id = bhv_user_id
WHERE raw_user_age > 20
GROUP BY bhv_item_id

函数

目前批式 SQL 支持 Spark 的大部分 build-in 函数,详见文档 Spark Built-In Functions
公共 UDF 函数
由平台方提供的开箱即用 UDF,使用时无需声明,直接像 Spark 内置函数一样使用即可,目前主要包含以下几种。

函数名称

函数功能

语法

入参说明

nlp_cut_words

对一个string类型的字段使用CRF模型处理进行切词,根据中文语义切分成多个词组成的string数组

ARRAY<String> nlp_cut_words(String content)

只能处理中文,多语言模型暂不支持

gauss_rank

对输入数值进行 Gauss Rank 计算,用于数据的归一化、标准化

Double gauss_rank(Number arg0[, Number arg1])

只支持数值型参数;
只传入arg0时,数值范围为(0,1),agr0 直接用于 Gauss Rank 计算;
同时传入arg0、arg1时,数值范围为 arg0 < arg1,arg0/arg1 的结果用于 Gauss Rank 计算

array_log1p_int

对输入数组进行 Log1p 计算

ARRAY<Int> array_log1p_int(ARRAY<Number> arg)

支持传入数值型数组参数

fnv_hash

对输入字符串使用64-bit FNV-1a算法计算hash,最高位置0,返回long类型

Long fnv_hash(String text)

字符串类型

例子

select nlp_cut_words(`doc_title`) as `doc_title_terms` from T;
select gauss_rank(`goods_price`) as `goods_price_gauss_rank` from T;
select array_log1p_int(array(1,2,3));
select fnv_hash("1");

新建任务-流式SQL

支持语法

流式 SQL 支持 Flink SQL 1.11 的标准语法和功能,但是在数据类型和部分SQL用法上有一定限制。
数据类型
典型的 Flink SQL 包含 Source 和 Sink 两部分的 DDL 语句,其中 Sink 部分的 DDL 仅支持以下基础类型 int, bigint, float, double, string 和对应的数组类型 array<int>, array<bigint>, array<float>, array<double>, array<string>
语法限制
不支持 DROP 操作。
保留关键字
与 Flink SQL 保持一致。

表声明

CREATE TABLE
Source 和 Sink 通过 CREATE TABLE DDL 来描述跟外部数据源的读取、写入配置信息。
其中 CREATE TABLE 语句的 with option 里必须显式配置 'connector' 和 'table-name' = 'xxx_table'

  • connector 目前只能选择消息队列表对应的 'byte-mq' 和 键值表对应的 'byte-kv'。
  • table-name 对应的 xxx_table 是表管理模块已经创建好的中间表,包括消息队列类表(MQ Topic)和键值类表(KV Table)。
-- 一个典型的消费 kafka topic 的 Source DDL 定义示例
CREATE TABLE source_table_demo (
    col_name1 int, 
    col_name2 bigint
) WITH (
      'connector' = 'byte-mq',  --必填字段
      'table-name' = 'mq_item_source'  -- 必填字段
      );

键值类表需要显式指定主键字段,只用于sink 场景,使用 primary key (col_name) not enforced 来描述主键字段,如下所示。

-- 一个典型的写入 KV table 的 Sink DDL 定义示例
CREATE TABLE sink_table_demo (
    col_name1 int, 
    col_name2 bigint, 
    `primary` `key (`col_name1`)` `not` `enforced` `-- KV 必须指定主键字段, 使用 not enforced 关键字`
) WITH (
      'connector' = 'byte-kv',
      'table-name' = 'kv_item_table'
      );

添加新字段
目前只能通过表管理的界面进行新增字段操作。
CREATE VIEW
对于比较复杂的 SQL 计算逻辑,推荐使用 CREATE VIEW 来承载中间计算结果,便于复用数据,同时也让整个 SQL 语句结构清晰易懂。

CREATE VIEW view_name AS selectStatement;

函数

Flink Built-in 函数

公共 UDF 函数
由平台方提供的开箱即用 UDF,使用时无需声明,直接像 Flink 内置函数一样使用即可,目前主要包含以下几种。

  • JSON 函数

函数名称

函数功能

语法

入参说明

json_str_to_array

将一个 JSON ARRAY 字符串转换为一个 ARRAY 返回,解析失败则返回为 null 或抛异常

ARRAY<VARCHAR> json_str_to_array(VARCHAR content``[``, BOOLEAN logFailuresOnly, BOOLEAN enableNullValueCheck``]``)

content 需要解析的 JSON ARRAY 对象
logFailuresOnly 默认FALSE,遇到异常 JSON 数据时抛异常;设为TRUE则打印错误日志后返回NULL
enableNullValueCheck 默认FALSE,当遇到ARRAY中的值为null时,转换为字符串"null";设为TRUE则转换为null

json_str_to_map

将一个 JSON 对象字符串转换为一个 MAP<VARCHAR, VARCHAR> 返回,注意只提取 Top-level 层;解析失败则返回 null 或抛异常

MAP``<VARCHAR, VARCHAR> json_str_to_map(VARCHAR content``[``, BOOLEAN logFailuresOnly, BOOLEAN enableNullValueCheck``]``)

同上

例子

select json_str_to_array(json_array_str, TRUE) as str_array from T;
json_str_to_array('["10","20","[30,40]"]', TRUE) -> ["10","20","[30,40]"]

select json_str_to_map(json_object_str, TRUE) as map_result from T;
json_str_to_map('{"a":"1","b":{"c":"2"},"d":["3","4"]}', TRUE) -> {"a":"1", "b":"{"c":"2"}","d":"["3","4"]"}
  • String 函数

函数名称

函数功能

语法

入参说明

split_str

将一个 VARCHAR 字符串进行分割, 返回一个分割后的ARRAY

ARRAY<VARCHAR> split_str(VARCHAR input``[``, VARCHAR delimiter, INT limit``]``)

input 需要分割的对象(字符串)
delimiter 指定分隔符,默认是“,”;注意特殊字符需进行转义
limit 指定分割后返回字符串的数量,默认是 0,即会去掉末尾的空字符串

例子

SELECT split_str(input,'-',2) as result from T;
split_str('cat.dog.fish', '\.') -> ['cat', 'dog', 'fish']
splitstr('a', '') -> ['a']
splitstr('a', '', -1) -> ['a', '']
  • Concat 函数

函数名称

函数功能

语法

入参说明

concat_ws_array

将一个 ARRAY 进行连接, 返回一个连接后的字符串

concat_ws_array(VARCHAR delimiter, ARRAY<VARCHAR> input)

delimiter 连接符
input 需要进行连接操作的对象

concat_array

将一个 ARRAY 进行连接, 返回一个连接后的字符串, 连接符为空字符串""

concat_array(ARRAY<VARCHAR> input)

同上

例子

SELECT concat_ws_array('-',input) as result from source;
concat_wa_array('-', ['today', 'is' ,'a', 'good', 'day']) -> 'today-is-a-good-day'

SELECT concat_array(input) as result from source;
concat_array(['today', 'is' ,'a', 'good', 'day']) -> 'todayisagoodday'
  • Array 函数

函数名称

函数功能

语法

入参说明

string_array_contains

注意数组中各元素以及目标元素均需为VARCHAR类型

BOOLEAN string_array_contains(ARRAY<VARCAHR> input, VARCHAR key)

input 用来做判断的数组
key 目标元素

int_array_contains

注意数组中各元素以及目标元素均需为INT类型

BOOLEAN int_array_contains(ARRAY<INT> input, INT key)

同上

long_array_contains

数组中各元素以及目标元素均需为LONG类型

BOOLEAN long_array_contains(ARRAY<LONG> input, LONG key)

float_array_contains

数组中各元素以及目标元素均需为FLOAT类型

BOOLEAN float_array_contains(ARRAY<FLOAT> input, FLOAT key)

double_array_contains

数组中各元素以及目标元素均需为DOUBLE类型

BOOLEAN double_array_contains(ARRAY<DOUBLE> input, DOUBLE key)

array_log1p_int

对输入数组进行 Log1p 计算

ARRAY<Int> array_log1p_int(ARRAY<String> arg)

支持传入字符串数组参数,一般配合 json_str_to_array 使用

cast_array_data_type_double

将输入数组元素转化为 Double

ARRAY<Double> cast_array_data_type_double(ARRAY<String> arg)

支持传入字符串数组参数,一般配合 json_str_to_array 使用

cast_array_data_type_float

将输入数组元素转化为 Float

ARRAY<Float> cast_array_data_type_float(ARRAY<String> arg)

支持传入字符串数组参数,一般配合 json_str_to_array 使用

cast_array_data_type_long

将输入数组元素转化为 Long

ARRAY<Long> cast_array_data_type_long(ARRAY<String> arg)

支持传入字符串数组参数,一般配合 json_str_to_array 使用

cast_array_data_type_integer

将输入数组元素转化为 Integer

ARRAY<Integer> cast_array_data_type_integer(ARRAY<String> arg)

支持传入字符串数组参数,一般配合 json_str_to_array 使用

例子

SELECT string_array_contains(input, 'cat') as result FROM source;
string_array_contains(['cat', 'dog', 'fish'], 'cat') -> true

SELECT int_array_contains(input, 1) as result FROM source;
int_array_contains(['1', '2', '3'], '1') -> true

SELECT long_array_contains(input, 1L) as result FROM source;
long_array_contains([1L, 2L, 3L], 1L) -> true

SELECT float_array_contains(input, 1.23f) as result FROM source;
float_array_contains([1.23f, 2.34f, 3.45f], 1.23f) -> true

SELECT double_array_contains(input, 1.23) as result FROM source;
double_array_contains([1.23, 2.34, 3.45], 1.23) -> true

SELECT array_log1p_int(input) as result FROM source;
array_log1p_int(ARRAY['1','2','3']) -> ARRAY[0,1,1]

SELECT array_log1p_int(json_str_to_array(json_str)) as result FROM source;
array_log1p_int(json_str_to_array('[1,2,3]')) -> ARRAY[0,1,1]

SELECT cast_array_data_type_double(json_str_to_array(json_str)) as result FROM source;
cast_array_data_type_double(json_str_to_array('[1,2,3]')) -> ARRAY[1.0,2.0,3.0]

SELECT cast_array_data_type_long(json_str_to_array(json_str)) as result FROM source;
cast_array_data_type_long(json_str_to_array('[1.1,2.1,3.1]')) -> ARRAY[1,2,3]
  • NLP函数

函数名称

函数功能

语法

入参说明

nlp_cut_words

对一个string类型的字段使用CRF模型处理进行切词,根据中文语义切分成多个词组成的string数组

ARRAY<String> nlp_cut_words(String content)

只能处理中文,多语言模型暂不支持

例子

select nlp_cut_words(`doc_title`) as `doc_title_terms` from T;

Connector

MQ

  • 以下参数以scan为前缀的对Source 生效, 以sink为前缀的对sink生效, 没有这两个前缀的对两者都能生效

name

meaning

required

default

consumer/producer

备注

connector

byte-mq

YES

consumer & producer

table-name

需要指定的 MQ 消息队列中间表名称

YES

consumer & producer

properties.group.id

consumer group id

YES

consumer

format

目前仅支持 json,必须填写

YES

consumer & producer

scan.manually-commit-offsets-interval

是否自动commit offset,单位是时间类似'10 s'

NO

consumer

scan.startup.mode

earliest-offset/latest-offset/group-offsets/specific-offsets/timestamp,详细解释

NO

group-offsets

consumer

scan.partition-fields

source key-by的字段, 在数据发给下游之前, 会先对source做key-by

NO

0

consumer

scan.startup.timestamp-millis

当指定scan.startup.mode为timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费

NO

0

consumer

scan.reset-to-earliest-for-new-partition

该参数表示在任务启动时,如果用的是group-offsets配置,且没有开启checkpoint或者开启checkpoint但checkpoint里面没有数据, 对于那些还没有offset的partition如何处理

NO

false

consumer

properties.{param}

{param} 是任意 kafka 参数, 如 properties.ignore.dc.check = true

NO

consumer & producer

parallelism

并发度

NO

-(kafka source 建议是 partition 个数)

consumer & producer

sink.partition-fields

按 key hash 的列名,可以是多列,用逗号分隔。如 'sink.partition-fields' = 'id,name',按 id 和 name 的 hash 写 kafka partition

NO

consumer

sink.log-failures-only

写入 kafka 失败时是否只打一条日志,task 不退出

NO

false

producer

scan.source-sample-interval

对 kafka topic 进行采样的采样间隔,单位为消息条数,值为0时不开启采样。

NO

0

consumer

scan.source-sample-num

对 kafka topic 进行采样的采样数,单位为消息条数,若未配置采样间隔,则此配置不生效。

NO

consumer

scan.metadata-fields-mapping

字段与元数据类型的映射字符串,每一对映射格式为{metadata_type}={field_name},映射之间采用逗号分隔,支持的metadata_type为timestamp、partition和offset(注意大小写),它们对应的数据类型都为bigint。

NO

consumer

rate-limit-num

限速配置,整个作业的配置,比如1000000,表示整个算子的总QPS不超过100w,注意,这个限速会在各个subtask之间平均分配额度,考虑到数据均衡和quota计算的整除,限制的速度一定不要限制的太死

NO

sink.in-flight-batch-size-factor

写入kafka时,in-flight数据最大的batch size因子,为kafka client参数“properties.batch.size”的 n 倍。

NO

0(不开启)

producer

sink.in-flight-max-num

写入kafka时,in-flight数据最大消息条数。当Flink侧未被ack的数据条数超过此大小时,将限制写入的速度。

NO

0(不开启)

producer

与batch-size同时配置时,取最小值来生效。

例子:读取一个消息队列表,经过 ETL 处理后输出到另外一个消息队列表。

CREATE  TABLE raw_item_source (
            `item_id`           BIGINT,
            `tags`              STRING,
            `cate`              STRING,
            `cate_cnt`          INT,
            `video_duration`    INT,
            `receive_timestamp` BIGINT,
            `status`            INT,
            `proc_time`         AS PROCTIME()
        )
        WITH (
            'connector' = 'byte-mq',
            'table-name' = 'mq_item_topic',
            'properties.group.id' = 'item_mq_demo',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true',
            'scan.manually-commit-offsets-interval' = '5000'
        );

CREATE  TABLE temp_item_sink (
            `item_id`           BIGINT,
            `tags`              STRING,
            `cate`              STRING,
            `cate_cnt`          INT,
            `video_duration`    INT,
            `receive_timestamp` BIGINT,
            `status`            INT
        )
        WITH (
            'connector' = 'byte-mq',
            'table-name' = 'mq_item_etl_topic',
            'format' = 'json'
        );

CREATE  VIEW `etl_view` AS
SELECT  item_id,
        tags,
        cate,
        cate_cnt,
        video_duration,
        receive_timestamp,
        status
FROM    raw_item_source
WHERE   status = 1
AND     cate_cnt > 2;

INSERT INTO temp_item_sink
SELECT  *
FROM    etl_view;
CREATE  TABLE my_source (
            `item_id`         BIGINT,
            `video_type`      STRING,
            `category`        STRING,
            `item_name`       STRING,
            `fake_id`         STRING,
            `copyright_start` BIGINT,
            `copyright_end`   BIGINT,
            `video_count`     BIGINT,
            proc_time         AS PROCTIME()
        )
        WITH (
            'connector' = 'byte-mq',
            'table-name' = 'mq_item',
            'properties.group.id' = 'item_mq_test_demo',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true',
            'scan.manually-commit-offsets-interval' = '5000'
        );

CREATE  VIEW my_view AS
SELECT  `item_id`,
        concat(`video_type`, ' @ ', `category`) AS video_info,
        upper(`item_name`) AS item_name,
        rand_integer(10) AS intval,
        char_length(`fake_id`) AS cnt,
        (`copyright_end` - `copyright_start`) AS time_gap,
        sqrt(`video_count`) AS video_count
FROM    my_source;

CREATE  TABLE my_sink(
            `item_id`     BIGINT,
            `video_info`  STRING,
            `item_name`   STRING,
            `intval`      INT,
            `cnt`         INT,
            `time_gap`    BIGINT,
            `video_count` DOUBLE
        )
        WITH (
            'connector' = 'byte-mq',
            'table-name' = 'mq_demo_sink_topic',
            'format' = 'json'
        );

INSERT INTO my_sink
SELECT  *
FROM    my_view
WHERE   time_gap > 0;

KV

name

meaning

required

default

connector

byte-kv

YES

table-name

KV表名称

YES

parallelism

算子并发度,通常不需要设定

NO

write_null

该参数表示支持写入null到kv表,比如字段A第一次写入时是"abc",若第二次写入字段null,那么kv中原有的"abc"会更新成null。
当某一个字段先执行 write_null 删除,后续数据时间小于等于当前时间戳版本时该字段都不会写入,如果要重新写入值,需要保证时间戳版本大于删除时的时间戳。

NO

false

例子:读取一个消息队列表,经过 ETL 处理后,写入到键值类表。

CREATE  TABLE raw_item_source (
            `item_id`           BIGINT,
            `tags`              STRING,
            `cate`              STRING,
            `cate_cnt`          INT,
            `video_duration`    INT,
            `receive_timestamp` BIGINT,
            `status`            INT,
            `proc_time`         AS PROCTIME()
        )
        WITH (
            'connector' = 'byte-mq',
            'table-name' = 'mq_item_topic',
            'properties.group.id' = 'item_kv_demo_group',
            'format' = 'json',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true',
            'scan.manually-commit-offsets-interval' = '5000'
        );

CREATE  TABLE item_feature_table (
            `item_id`               BIGINT,
            `doc_cate1`             STRING,
            `doc_cate2`             STRING,
            `doc_cate3`             STRING,
            `doc_tags`              ARRAY<STRING>,
            `doc_cate_cnt`          INT,
            `doc_video_duration_10` INT,
            `doc_receive_timestamp` BIGINT,
            primary                 key(item_id) NOT enforced
        )
        WITH ('connector' = 'byte-kv', 'table-name' = 'item_kv_feature_table');

CREATE  VIEW kv_view AS
SELECT  doc_id,
        split_index(cate, '#', 0) AS doc_cate1,
        split_index(cate, '#', 1) AS doc_cate2,
        split_index(cate, '#', 2) AS doc_cate3,
        ARRAY['游戏', 'switch', '塞尔达', '林达'] AS tags,
        cate_cnt AS doc_cate_cnt,
        CAST(log10(video_duration) AS INT) AS doc_video_duration_10,
        receive_timestamp AS doc_receive_timestamp
FROM    raw_item_source
WHERE   status = 1;

INSERT INTO item_feature_table
SELECT  *
FROM    kv_view;