通过标准化的Flink SQL语言,在线开发、测试、运维流式任务,不依赖jar包,方便更新和维护。通过创建该任务,来构建符合业务场景的实时数仓。
Native Flink Streaming SQL任务主要适用以下场景:
不同 MiniBase Hadoop、Apache Hadoop 集群版本,支持的 Flink 版本如下:
创建Native Flink Streaming SQL任务后,进入SQL语句编辑页面,通过DDL编写SQL。
SQL 编辑
通过DDL语法声明使用数据源,Flink SQL 加工数据,目前提供以下几类语法:
CREATE TABLE sink_to_kafka_20240403 ( id INT, userid INT, username STRING, prod_id INT, price DECIMAL(10, 2), amount INT, update_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = '{{topic_name}}', 'properties.bootstrap.servers' = '{{bootstrap_server_address}}', 'format' = 'json' );
DataLeap 版本的DDL语法与开源版有少量不同,具体看下方的说明DDL参数设置。
Flink SQL 写入 Hive 任务示例:
-- 定义 hive catalog CREATE CATALOG hive_catalog WITH ('type' = 'hive', -- 配置默认的数据库 'default-database' = 'hive_test_table' ); -- 使用 hive catalog use catalog hive_catalog; -- 创建 Source Table CREATE TEMPORARY TABLE source_kafka ( id INT, name STRIN, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '10' SECOND ) WITH ( 'properties.bootstrap.servers' = 'kafka-cnxxxxxxxxrk.kafka.volces.com:9592,kafka-cnxxxxxxxxrk.kafka.volces.com:9593,kafka-cnxxxxxxxxrk.kafka.volces.com:9594', 'properties.group.id' = 'dorado_onpermise_tes1', 'format' = 'json', 'connector' = 'kafka', 'topic' = 'dorado_test', 'scan.startup.mode' = 'latest-offset' ); -- 如果这个表在 hive 中没有的话, Flink SQL 会进行创建,有这张表的话,就不会,dt,hh,mm 是分区字段 CREATE TABLE IF NOT EXISTS lake_partition (id INT, name STRING, dt STRING,hh STRING,mm string) PARTITIONED BY (dt,hh,mm) WITH ( 'connector' = 'hive', 'partition.time-extractor.timestamp-pattern' = '$dt $hh:$mm:00', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '10 s', 'sink.partition-commit.policy.kind' = 'metastore' ); INSERT INTO lake_partition SELECT id, name, DATE_FORMAT(log_ts, 'yyyy-MM-dd') as dt, DATE_FORMAT(log_ts, 'HH') as hh, DATE_FORMAT(log_ts, 'mm') as mm FROM source_kafka;
Flink SQL 写入 MySQL 任务示例:
-- 1.创建 kafka source table create table click_data( user_id bigint, article_id bigint, click_time timestamp, watermark for click_time as withoffset(click_time, 3000) ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'test_topic', 'connector.cluster' = 'test_cluster', 'connector.group.id' = 'flink_1.9_sql_test', 'update-mode' = 'append', 'format.type' = 'json', ); -- 2.创建 Mysql sink table create table article_pv( article_id bigint, PV bigint, s_time timestamp ) with ( 'connector.type' = 'jdbc', 'connector.table' = 'test_table', 'connector.dbname' = 'testdb', 'connector.url' = 'mysql.testdb_write', 'connector.username' = 'sink.test', 'connector.password' = 'abcabc', 'connector.write.flush.max-rows' = '5' ); -- 3. 计算逻辑 insert into article_pv select article_id, count(*) as PV, TUMBLE_START(click_time, INTERVAL '1' HOUR) as s_time from click_data group by article_id, TUMBLE(click_time, INTERVAL '1' HOUR)
Flink SQL 写入 Paimon 任务示例:
-- 引入 Paimon 连接器依赖 -- 确保你的 Flink 环境中已经正确配置了 Paimon 连接器 -- Flink 运行参数添加:execution.checkpointing.interval = 60s CREATE CATALOG my_catalog WITH ( 'type'='paimon', 'warehouse'='hdfs://emr-cluster-test/warehouse/tablespace/managed/hive' ); USE CATALOG my_catalog; -- flink 需要手工创建 paimon 表 -- CREATE TABLE word_count ( -- word STRING PRIMARY KEY NOT ENFORCED, -- cnt BIGINT -- ); CREATE TEMPORARY TABLE word_table ( word STRING ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '1', 'rows-per-second' = '10', -- 每秒生成 10 行数据 ); INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
PB 格式定义
使用PB格式的数据源,需要在当前任务上传PB类的定义文件,或手工输入PB类,json格式无需设置。一次只支持一个PB类的定义,例如:
syntax = "proto2"; package abase_test; message AbaseTest { required int64 first_id = 1; required int64 latest_id = 2; }
说明
'format.pb-class' = 'parser.proto.ProtoParser$Instance'数据源格式选"Pb","Pb类定义"中开头设置package和option java_outer_classname,"入口message名称" 指定入口message名称。举例如下:
执行引擎:
目前Native Flink Streaming SQL任务,执行引擎支持可选为Native Flink-1.11、Native Flink-1.15、Native Flink-1.17版本,不同 CDH、Hadoop 集群版本,支持的 Flink 版本不同,您可根据实际使用场景进行选择。
任务配置完成后,需要配置任务参数,操作如下:
单击右侧侧边栏的参数设置,进入参数设置页面。完成任务基本信息、输入参数、资源设置、数据源登记、Flink运行参数等相关参数配置说明,请参见“参数设置”。
参数配置完成后,可单击调试按钮,在弹出的窗口中,进行构建数据,和调试配置,单击开始调试按钮后,调试结果/日志可在页面下方查看。
说明
目前仅 MiniBase Hadoop 引擎下的 Flink SQL 流式开发任务支持调试操作。
构建数据
任务调试前,需进行构建数据:
参数 | 说明 |
|---|---|
所属数据源 | 选择所属数据源的信息,您可通过左侧列表中的构建输入数据按钮,进行切换所属数据源。 |
输入数据名称 | 填写输入数据的名称,支持中文、字母、数字、下划线组合。 |
获取方式 |
数据构造完成后,您可点击下方保存并预览数据按钮,左侧列表展示当前任务相关数据源的输入数据情况及抽取状态,显示绿色时,即表示抽取成功。 若后续需替换测试数据,您也可单击测试数据详情右侧的数据配置按钮,重新进行数据上传或构造。 |

调试配置
在调试配置界面,您需进行任务的运行模式、输入数据选择。
参数 | 说明 |
|---|---|
输入数据 | 输入数据支持选择测试数据、线上数据类型:
|
调试配置完成后,单击开始调试按钮,即开始调试运行,在下方的测试记录窗口,查看任务运行的概览、结果、日志等信息。
调试成功后,单击保存图标,完成草稿保存。
DataLeap的流式SQL在DDL部分与开源版本有少量不同,具体参考以下内容。
表 Kafka DDL 参数说明
name | meaning | required | default | consumer/producer |
|---|---|---|---|---|
connector.type | Connector的类型,必须是 kafka | YES | - | consumer & producer |
connector.version | kafka 版本,当前支持 '0.10' | YES | - | consumer & producer |
connector.cluster | kafka 集群名 | YES | - | consumer & producer |
connector.topic | kafka topic | YES | - | consumer & producer |
connector.group.id | consumer group id | YES | - | consumer |
update-mode | 更新模式, 'append'/'upsert' | YES | - | consumer 就填 'append' 就行; producer 如果是查询的结果是可以更新的就用 upsert, 如果是查询的结果是不可更新的就用 append. |
connector.owner | 作业 owner | NO | - | consumer & producer |
connector.startup-mode | earliest-offset/latest-offset/group-offsets/specific-timestamp,详细解释如下 | NO | group-offsets | consumer |
connector.specific-timestamp | 当指定connector.startup-mode为specific-timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费 | NO | 0 | consumer |
connector.reset-to-earliest-for-new-partition | 该参数表示在任务启动时,如果用的是group-offsets配置,对于那些还没有offset的partition如何处理,详情可以参考:kafka partition扩容 | NO | true | consumer |
connector.kafka.properties.{param} | {param} 是任意 kafka 参数, 如 connector.kafka.properties.ignore.dc.check = true | NO | - | consumer & producer |
connector.log-failures-only | 写入 kafka 失败时是否只打一条日志,task 不退出 | NO | false | producer |
connector.rate-limiting-num | 读取 kafka 限速,是整个 source table 所有并发的流速之和, 配合 connector.rate-limiting-unit 使用。 | NO | -1,默认不限速 | consumer |
connector.rate-limiting-unit | 读取 kafka 限速的单位,配合 connector.rate-limiting-num 使用,可选单位:'BYTE', 'RECORD' 分别对应 byte 和 消息条数 | NO | 'BYTE' | consumer |
表 MySQL DDL 参数说明
name | meaning | required | 取值范围 | default |
|---|---|---|---|---|
connector.type | connector type | YES | jdbc | - |
connector.dbname | 数据库名 | YES | - | - |
connector.table | 数据库表名 | YES | - | - |
connector.init-sql | 新建 db 连接时预执行的语句。 如需要写入表情符号时,设置 'connector.init-sql' = 'SET NAMES utf8mb4' | NO | - | - |
connector.write.flush.max-rows | 积攒多少条数据一批次写入 | NO | 0~ | 5000 |
connector.write.flush.interval | 积攒多久时间的数据一批次写入, 单位 ms, 默认是 0 的话,就是没有定期输出,而不是每来一条就输出。 | NO | 0~ | 0 |
connector.write.max-retries | 写入数据库失败的情况下,最多尝试多少次 | NO | 0~ | 3 |
使用 json format 需在 create table 语句中通过'format.type'='json' 指定。
注意事项:
json类型根据用户在create table中声明的column name和column type做解析。SQL任务目前仅做部分自动适配,例如声明为int类型,但实际json里面是int的string表示,SQL也能自动识别并转换回int。但是如果例如声明为int但实际为long或者声明为int但实际是带有字母的string等,SQL无法直接转换,会报错。
json类型支持嵌套json。例如嵌套json "{"a": "a", "b": {"c": 1, "d": "s"}}", 声明嵌套结构时,需要声明为b Row<c int, d varchar> 即可表明字段c、d是嵌套于字段b中。json也支持array形式。
如果是简单的array,例如"a": [1,2,3,4],则可声明为a Array,即表明a是一个int类型的array,注意,select 的时候,下标是从 1 开始的。例如"a": [6,7,8,9], select a[1] 会返回 6。
如果是嵌套json的array,例如"a": [{"b": 3}, {"b": 6}],则可声明为a Array
表 Kafka json Format参数说明
name | meaning | required | default | note |
|---|---|---|---|---|
format.type | format 的类型,必须是 'json' | YES | - | - |
format.derive-schema | json schema 指定方式之一,即自动按table 的schema 推断。强烈建议用这个 | No | true | 样例: 'format.derive-schema'='true' |
format.schema | json schema 指定方式之一,指定 type info, 不建议用这个,建议用 format.derive-schema | No | - | 样例: 'format.schema'='ROW<test1 VARCHAR, test2 TIMESTAMP>' |
format.json-schema | json schema 指定方式之一 不建议用这个,建议用 format.derive-schema | No | - | 样例: 'format.schema'='{'title': 'Person', 'properties': {'firstName': {'type': 'string'}}} |
format.fail-on-missing-field | 缺少字段的时候是否直接失败 | No | false | - |
format.default-on-missing-field | 缺少字段的时候是否自动添加默认值 | No | false | - |
format.skip-dirty | 跳过脏数据 | No | false | - |
format.skip-interval-ms | 脏数据打印间隔(默认是10s) | No | 10000 | - |
format.json-parser-feature.{feature} | jackson 的 JsonParser.Feature 相关配置 | No | - | 样例:'format.json-parser-feature.ALLOW_UNQUOTED_CONTROL_CHARS'='true' |
format.enforce-utf-encoding | 是否强制编码为utf8编码(默认非BMP类型的unicode会以转义的方式来处理 | NO | false | - |
format.filter-null-values | 是否把null字段不写进json中 | NO | false | - |
format.bytes-as-json-node | 是否把json中byte字段当成any类型. | NO | false |
使用 pb format 需在 create table 语句中通过'format.type'='pb' 指定。参数说明如下:
表 Kafka pb format 参数说明
name | meaning | required | default | note |
|---|---|---|---|---|
format.type | format 的类型,必须是 'pb' | YES | - | - |
format.pb-class | 指定 pb class | YES | - | 样例: 'format.pb-class' = 'parser.proto.ProtoParser$Instance' |
format.pb-skip-bytes | 解析 pb bytes 的时候忽略前几个bytes,这个是 AML 的特殊需求,普通用户请忽略 | No | 0 | 样例:'format.pb-skip-bytes' = '8' |
format.pb-sink-with-size-header | true/false. 往外 sink pb bytes 的时候,是否在前面加上 8 字节的 pb bytes size。这个是 AML 的特殊需求,普通用户请忽略 | No | false | 样例:'format.pb-sink-with-size-header' = 'true' |
format.ignore-parse-errors | 是否忽略解析错误的数据 | NO | false | - |
pb 中类型和 sql 中类型的映射关系如下表所示。
types in pb | sql type | note |
|---|---|---|
repeated | ARRAY | 如:repeated int32 -> ARRAY |
MAP | MAP | 如:map<string, int32> -> MAP<varchar, int> |
enum | varchar | - |
one of | - | 会将 oneof 字段直接解到上一层。 |
其他复杂类型 | Row | - |
double | double | - |
float | float | - |
int32 | int | - |
uint32 | int | - |
uint64 | bigint | - |
sint32 | int | - |
sint64 | bigint | - |
fixed32 | int | - |
fixed64 | bigint | - |
sfixed32 | int | - |
sfixed64 | bigint | - |
bool | boolean | - |
string | varchar | - |
bytes | binary | - |