You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
下载 pdf
湖仓分析一体服务LAS引擎
LAS Flink SQL任务
复制全文
下载 pdf
LAS Flink SQL任务

通过标准化的 Flink SQL 语言,在线开发、测试、运维流式任务,不依赖jar包,方便更新和维护。
LAS Flink SQL任务主要适用以下场景:

  • 对流式数据通过sql的方式做聚合加工,产出实时指标。
  • 对结构比较清晰的数据做清洗过滤的流式ETL需求。
  • 构建流式数据仓库。

新建任务

  1. 登录DataLeap控制台。
  2. 选择数据开发 > 任务开发,进入任务开发页面。
  3. 单击新建任务 > 数据开发 > LAS > 流式数据LAS Flink SQL按钮,进行任务类型选择。
  4. 设置任务基本信息,单击确定按钮,完成创建。
    Image

编辑任务

创建LAS Flink SQL任务后,进入SQL语句编辑页面,通过DDL编写SQL。

  • SQL 编辑
    通过DDL语法声明使用数据源,Flink SQL 加工数据,目前提供以下几类语法:

    • create table:用于定义表结构,包含有定义源表(source)、结果表(sink)。
    • create view:用于定义中间结果视图。
    • create function:用于自定义udf的声明。
    • insert into XXX select:用于表示真正需要执行的sql语句,并将执行结果insert到对应的结果表中。
      DataLeap 版本的DDL语法与开源版有少量不同,具体看下方的说明与DDL参数表格。
      一个完整的任务示例:
    -- 1.创建 kafka source table
          create table click_data(
              user_id bigint,
              article_id bigint,
              click_time timestamp,
              watermark for click_time as withoffset(click_time, 3000)
          ) with (
              'connector.type' = 'kafka',
              'connector.version' = '0.10',
             'connector.topic' = 'test_topic',
             'connector.cluster' = 'test_cluster',
             'connector.group.id' = 'flink_1.9_sql_test',
             'update-mode' = 'append',
             'format.type' = 'json',
         );
          
         -- 2.创建 Mysql sink table
         create table article_pv(
             article_id bigint,
             PV bigint,
             s_time timestamp
         ) with (
             'connector.type' = 'jdbc',
             'connector.table' = 'test_table',
             'connector.dbname' = 'testdb',
             'connector.url' = 'mysql.testdb_write',
             'connector.username' = 'sink.test',
             'connector.password' = 'abcabc',
             'connector.write.flush.max-rows' = '5'
          );
         -- 3. 计算逻辑
         insert into article_pv 
         select article_id, count(*) as PV, TUMBLE_START(click_time, INTERVAL '1' HOUR) as s_time
         from click_data 
         group by article_id, TUMBLE(click_time, INTERVAL '1' HOUR)
    
  • PB 格式定义
    使用PB格式的数据源,需要在当前任务上传PB类的定义文件,或手工输入PB类,json格式无需设置。一次只支持一个PB类的定义,例如:

    syntax = "proto2";
         package abase_test;
         message AbaseTest {
         required int64 first_id = 1;
         required int64 latest_id = 2;
    }
    

    说明

    • 如果kafka topic是binlog数据,在flink sql source中指定下'format.type' = 'pb_binlog'即可,数据类型选择'json'方式。
    • 如果Kafka topic是自定义pb数据,在flink sql source中指定下'format.pb-class' = '{package的路径}.{java_outer_classname的类名}${入口message名称}'。 例如,'format.pb-class' = 'parser.proto.ProtoParser$Instance'

    数据源格式选"Pb","Pb类定义"中开头设置package和option java_outer_classname,"入口message名称" 指定入口message名称。举例如下:

    • "Pb类定义"开头:
      • package parser.proto;
      • option java_outer_classname = "ProtoParser";
    • "入口message名称":Instance

参数设置

创建任务或任务编辑调试完成后,需要配置任务参数,操作如下:

  1. 单击右侧侧边栏的参数设置,进入参数设置页面。
  2. 设置任务参数信息后,单击保存图标,完成草稿保存。
    相关参数配置说明,请参见“参数设置”。

DDL参数设置

DataLeap的流式SQL在DDL部分与开源版本有少量不同,具体参考以下内容。

name

meaning

required

default

consumer/producer

connector.type

Connector的类型,必须是 kafka

YES

-

consumer & producer

connector.version

kafka 版本,当前支持 '0.10'

YES

-

consumer & producer

connector.cluster

kafka 集群名

YES

-

consumer & producer

connector.topic

kafka topic

YES

-

consumer & producer

connector.group.id

consumer group id

YES

-

consumer

update-mode

更新模式, 'append'/'upsert'

YES

-

consumer 就填 'append' 就行; producer 如果是查询的结果是可以更新的就用 upsert, 如果是查询的结果是不可更新的就用 append.

connector.owner

作业 owner

NO

-

consumer & producer

connector.startup-mode

earliest-offset/latest-offset/group-offsets/specific-timestamp,详细解释如下

NO

group-offsets

consumer

connector.specific-timestamp

当指定connector.startup-mode为specific-timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费

NO

0

consumer

connector.reset-to-earliest-for-new-partition

该参数表示在任务启动时,如果用的是group-offsets配置,对于那些还没有offset的partition如何处理,详情可以参考:kafka partition扩容

NO

true

consumer

connector.kafka.properties.{param}

{param} 是任意 kafka 参数, 如 connector.kafka.properties.ignore.dc.check = true

NO

-

consumer & producer

connector.log-failures-only

写入 kafka 失败时是否只打一条日志,task 不退出

NO

false

producer

connector.rate-limiting-num

读取 kafka 限速,是整个 source table 所有并发的流速之和, 配合 connector.rate-limiting-unit 使用。

NO

-1,默认不限速

consumer

connector.rate-limiting-unit

读取 kafka 限速的单位,配合 connector.rate-limiting-num 使用,可选单位:'BYTE', 'RECORD' 分别对应 byte 和 消息条数

NO

'BYTE'

consumer

表 429 MySQL DDL 参数说明

name

meaning

required

取值范围

default

connector.type

connector type

YES

jdbc

-

connector.dbname

数据库名

YES

-

-

connector.table

数据库表名

YES

-

-

connector.init-sql

新建 db 连接时预执行的语句。 如需要写入表情符号时,设置 'connector.init-sql' = 'SET NAMES utf8mb4'

NO

-

-

connector.write.flush.max-rows

积攒多少条数据一批次写入

NO

0~

5000

connector.write.flush.interval

积攒多久时间的数据一批次写入, 单位 ms, 默认是 0 的话,就是没有定期输出,而不是每来一条就输出。

NO

0~

0

connector.write.max-retries

写入数据库失败的情况下,最多尝试多少次

NO

0~

3

Kafka json Format

使用 json format 需在 create table 语句中通过'format.type'='json' 指定。
注意事项:
json类型根据用户在create table中声明的column name和column type做解析。SQL任务目前仅做部分自动适配,例如声明为int类型,但实际json里面是int的string表示,SQL也能自动识别并转换回int。但是如果例如声明为int但实际为long或者声明为int但实际是带有字母的string等,SQL无法直接转换,会报错。
json类型支持嵌套json。例如嵌套json "{"a": "a", "b": {"c": 1, "d": "s"}}", 声明嵌套结构时,需要声明为b Row<c int, d varchar> 即可表明字段c、d是嵌套于字段b中。json也支持array形式。

  1. 如果是简单的array,例如"a": [1,2,3,4],则可声明为a Array,即表明a是一个int类型的array,注意,select 的时候,下标是从 1 开始的。例如"a": [6,7,8,9], select a[1] 会返回 6。

  2. 如果是嵌套json的array,例如"a": [{"b": 3}, {"b": 6}],则可声明为a Array<Row<b int>>, 即表明a是一个object类型array,a的结构中,有一个类型为int的b列。获取的方式如 select a[1].b 会返回3.
    Kafka json Format参数说明:

    name

    meaning

    required

    default

    note

    format.type

    format 的类型,必须是 'json'

    YES

    -

    -

    format.derive-schema

    json schema 指定方式之一,即自动按table 的schema 推断。强烈建议用这个

    No

    true

    样例: 'format.derive-schema'='true'

    format.schema

    json schema 指定方式之一,指定 type info, 不建议用这个,建议用 format.derive-schema

    No

    -

    样例: 'format.schema'='ROW<test1 VARCHAR, test2 TIMESTAMP>'

    format.json-schema

    json schema 指定方式之一 不建议用这个,建议用 format.derive-schema

    No

    -

    样例: 'format.schema'='{'title': 'Person', 'properties': {'firstName': {'type': 'string'}}}

    format.fail-on-missing-field

    缺少字段的时候是否直接失败

    No

    false

    -

    format.default-on-missing-field

    缺少字段的时候是否自动添加默认值

    No

    false

    -

    format.skip-dirty

    跳过脏数据

    No

    false

    -

    format.skip-interval-ms

    脏数据打印间隔(默认是10s)

    No

    10000

    -

    format.json-parser-feature.{feature}

    jackson 的 JsonParser.Feature 相关配置

    No

    -

    样例:'format.json-parser-feature.ALLOW_UNQUOTED_CONTROL_CHARS'='true'

    format.enforce-utf-encoding

    是否强制编码为utf8编码(默认非BMP类型的unicode会以转义的方式来处理

    NO

    false

    -

    format.filter-null-values

    是否把null字段不写进json中

    NO

    false

    -

    format.bytes-as-json-node

    是否把json中byte字段当成any类型.

    NO

    false

    -

Kafka pb format

使用 pb format 需在 create table 语句中通过'format.type'='pb' 指定。参数说明如下:
Kafka pb format参数说明

name

meaning

required

default

note

format.type

format 的类型,必须是 'pb'

YES

-

-

format.pb-class

指定 pb class

YES

-

样例: 'format.pb-class' = 'parser.proto.ProtoParser$Instance'

format.pb-skip-bytes

解析 pb bytes 的时候忽略前几个bytes,这个是 AML 的特殊需求,普通用户请忽略

No

0

样例:'format.pb-skip-bytes' = '8'

format.pb-sink-with-size-header

true/false. 往外 sink pb bytes 的时候,是否在前面加上 8 字节的 pb bytes size。这个是 AML 的特殊需求,普通用户请忽略

No

false

样例:'format.pb-sink-with-size-header' = 'true'

format.ignore-parse-errors

是否忽略解析错误的数据

NO

false

-

pb 中类型和 sql 中类型的映射关系如下表所示。
类型映射关系说明:

types in pb

sql type

note

repeated

ARRAY

如:repeated int32 -> ARRAY

MAP

MAP

如:map<string, int32> -> MAP<varchar, int>

enum

varchar

-

one of

-

会将 oneof 字段直接解到上一层。

其他复杂类型

Row

-

double

double

-

float

float

-

int32

int

-

uint32

int

-

uint64

bigint

-

sint32

int

-

sint64

bigint

-

fixed32

int

-

fixed64

bigint

-

sfixed32

int

-

sfixed64

bigint

-

bool

boolean

-

string

varchar

-

bytes

binary

-

最近更新时间:2024.12.19 17:02:10
这个页面对您有帮助吗?
有用
有用
无用
无用