You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件(私有化)

大数据研发治理套件(私有化)

复制全文
Apache Hadoop/MiniBase Hadoop
Native-Flink-Streaming-SQL任务
复制全文
Native-Flink-Streaming-SQL任务

通过标准化的Flink SQL语言,在线开发、测试、运维流式任务,不依赖jar包,方便更新和维护。通过创建该任务,来构建符合业务场景的实时数仓。
Native Flink Streaming SQL任务主要适用以下场景:

  • 对流式数据通过sql的方式做聚合加工,产出实时指标。
  • 对结构比较清晰的数据做清洗过滤的流式ETL需求。
  • 构建流式数据仓库。

使用限制

  • 目前仅 MiniBase Hadoop 引擎下的 Flink SQL 流式开发任务支持调试操作,其余引擎的 SQL 任务暂不支持调试操作。
  • 提交任务上线检查时,会开展 SQL 解析检查,目前,暂不支持解析时读取 HDFS/Hive,您可忽略检查的异常提示,将任务提交上线。

注意事项

不同 MiniBase Hadoop、Apache Hadoop 集群版本,支持的 Flink 版本如下:

  • MiniBase Hadoop 集群:支持 Flink 1.15;
  • Apache Hadoop、Apache Hadoop-MRS 集群、Apache Hadoop-EMR 集群、Apache Hadoop-TBDS 集群:支持 Flink 1.13、Flink 1.15、Flink 1.17。

新建任务

  1. 登录DataLeap控制台。
  2. 选择数据开发 > 任务开发,进入任务开发页面。
  3. 依次单击新建任务 > 数据开发 > Apache Hadoop/MiniBase Hadoop引擎
  4. 关联实例:下拉选择已在项目配置中绑定的 Apache Hadoop 或 MiniBase Hadoop 集群实例信息。
  5. 选择流式数据 > Native Flink Streaming SQL任务按钮。
  6. 并设置任务基本信息,单击确定按钮,完成创建。

Image

编辑任务

创建Native Flink Streaming SQL任务后,进入SQL语句编辑页面,通过DDL编写SQL。

  • SQL 编辑
    通过DDL语法声明使用数据源,Flink SQL 加工数据,目前提供以下几类语法:

    • create table:用于定义表结构,包含有定义源表(source)、结果表(sink)。
    • create view:用于定义中间结果视图。
    • create function:用于自定义udf的声明。
    • insert into XXX select:用于表示真正需要执行的sql语句,并将执行结果insert到对应的结果表中。
    • Flink SQL 任务中支持以 {{var}} 项目参数形式进行编辑,如下方 {{topic_name}}、{{bootstrap_server_address}} 参数,可通过在项目控制台中设置参数管理,进行联合使用:
      Image
      CREATE TABLE sink_to_kafka_20240403 (
           id INT,
           userid INT,
           username STRING,
           prod_id INT,
           price DECIMAL(10, 2),
           amount INT,
           update_time TIMESTAMP(3)
      ) WITH (
           'connector' = 'kafka',
           'topic' = '{{topic_name}}',
           'properties.bootstrap.servers' = '{{bootstrap_server_address}}',
           'format' = 'json'
      );
      

    DataLeap 版本的DDL语法与开源版有少量不同,具体看下方的说明DDL参数设置
    Flink SQL 写入 Hive 任务示例:

    -- 定义 hive catalog
    CREATE  CATALOG hive_catalog
            WITH ('type' = 'hive',
            -- 配置默认的数据库
            'default-database' = 'hive_test_table'
            );
            
    -- 使用 hive catalog
    use catalog hive_catalog;
    
    -- 创建 Source Table
    CREATE  TEMPORARY TABLE source_kafka (
                id        INT,
                name      STRIN,
                log_ts    TIMESTAMP(3),
                WATERMARK FOR log_ts AS log_ts - INTERVAL '10' SECOND
            )
            WITH (
                'properties.bootstrap.servers' = 'kafka-cnxxxxxxxxrk.kafka.volces.com:9592,kafka-cnxxxxxxxxrk.kafka.volces.com:9593,kafka-cnxxxxxxxxrk.kafka.volces.com:9594',
                'properties.group.id' = 'dorado_onpermise_tes1',
                'format' = 'json',
                'connector' = 'kafka',
                'topic' = 'dorado_test',
                'scan.startup.mode' = 'latest-offset'
            );
    
    
    -- 如果这个表在 hive 中没有的话, Flink SQL 会进行创建,有这张表的话,就不会,dt,hh,mm 是分区字段
    CREATE  TABLE IF NOT EXISTS lake_partition (id INT, name STRING, dt STRING,hh STRING,mm string)
            PARTITIONED BY (dt,hh,mm)
            WITH (
                'connector' = 'hive',
                'partition.time-extractor.timestamp-pattern' = '$dt $hh:$mm:00',
                'sink.partition-commit.trigger' = 'partition-time',
                'sink.partition-commit.delay' = '10 s',
                'sink.partition-commit.policy.kind' = 'metastore'
            );
    
    
    INSERT INTO lake_partition
    SELECT  id,
            name,
            DATE_FORMAT(log_ts, 'yyyy-MM-dd') as dt,
            DATE_FORMAT(log_ts, 'HH') as hh,
            DATE_FORMAT(log_ts, 'mm') as mm
    FROM    source_kafka;
    

    Flink SQL 写入 MySQL 任务示例:

    -- 1.创建 kafka source table
    create table click_data(
        user_id bigint,
        article_id bigint,
        click_time timestamp,
        watermark for click_time as withoffset(click_time, 3000)
    ) with (
        'connector.type' = 'kafka',
        'connector.version' = '0.10',
        'connector.topic' = 'test_topic',
        'connector.cluster' = 'test_cluster',
        'connector.group.id' = 'flink_1.9_sql_test',
        'update-mode' = 'append',
        'format.type' = 'json',
    );
     
    -- 2.创建 Mysql sink table
    create table article_pv(
        article_id bigint,
        PV bigint,
        s_time timestamp
    ) with (
        'connector.type' = 'jdbc',
        'connector.table' = 'test_table',
        'connector.dbname' = 'testdb',
        'connector.url' = 'mysql.testdb_write',
        'connector.username' = 'sink.test',
        'connector.password' = 'abcabc',
        'connector.write.flush.max-rows' = '5'
    );
    -- 3. 计算逻辑
    insert into article_pv select article_id, count(*) as PV, TUMBLE_START(click_time, INTERVAL '1' HOUR) as s_time from click_data group by article_id, TUMBLE(click_time, INTERVAL '1' HOUR)
    

    Flink SQL 写入 Paimon 任务示例:

    -- 引入 Paimon 连接器依赖
    -- 确保你的 Flink 环境中已经正确配置了 Paimon 连接器
    -- Flink 运行参数添加:execution.checkpointing.interval = 60s
    
    CREATE CATALOG my_catalog WITH (
        'type'='paimon',
        'warehouse'='hdfs://emr-cluster-test/warehouse/tablespace/managed/hive'
    );
    USE CATALOG my_catalog;
    -- flink 需要手工创建 paimon 表
    -- CREATE TABLE word_count (
    --     word STRING PRIMARY KEY NOT ENFORCED,
    --     cnt BIGINT
    -- );
    
    CREATE TEMPORARY TABLE word_table (
        word STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.word.length' = '1',
        'rows-per-second' = '10',  -- 每秒生成 10 行数据
    );
    
    INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
    
  • PB 格式定义
    使用PB格式的数据源,需要在当前任务上传PB类的定义文件,或手工输入PB类,json格式无需设置。一次只支持一个PB类的定义,例如:

    syntax = "proto2";
    package abase_test;
    message AbaseTest {
        required int64 first_id = 1;
        required int64 latest_id = 2;
    }
    

    说明

    • 如果kafka topic是binlog数据,在flink sql source中指定下'format.type' = 'pb_binlog'即可,数据类型选择'json'方式。
    • 如果Kafka topic是自定义pb数据,在flink sql source中指定下'format.pb-class' = '{package的路径}.{java_outer_classname的类名}${入口message名称}'。
      例如,'format.pb-class' = 'parser.proto.ProtoParser$Instance'

    数据源格式选"Pb","Pb类定义"中开头设置package和option java_outer_classname,"入口message名称" 指定入口message名称。举例如下:

    • "Pb类定义"开头:
      package parser.proto;
      option java_outer_classname = "ProtoParser";
    • "入口message名称":Instance
  • 执行引擎:
    目前Native Flink Streaming SQL任务,执行引擎支持可选为Native Flink-1.11、Native Flink-1.15、Native Flink-1.17版本,不同 CDH、Hadoop 集群版本,支持的 Flink 版本不同,您可根据实际使用场景进行选择。

参数设置

任务配置完成后,需要配置任务参数,操作如下:
单击右侧侧边栏的参数设置,进入参数设置页面。完成任务基本信息、输入参数、资源设置、数据源登记、Flink运行参数等相关参数配置说明,请参见“参数设置”。

任务调试

参数配置完成后,可单击调试按钮,在弹出的窗口中,进行构建数据,和调试配置,单击开始调试按钮后,调试结果/日志可在页面下方查看。

说明

目前仅 MiniBase Hadoop 引擎下的 Flink SQL 流式开发任务支持调试操作。

  1. 构建数据
    任务调试前,需进行构建数据:

    参数

    说明

    所属数据源

    选择所属数据源的信息,您可通过左侧列表中的构建输入数据按钮,进行切换所属数据源。

    输入数据名称

    填写输入数据的名称,支持中文、字母、数字、下划线组合。

    获取方式

    • 手动构造:支持 json、excel 形式进行手动构造数据。
    • 本地上传:将本地数据文件拖入,点击上传即可,文件大小限制 20MB。

    数据构造完成后,您可点击下方保存并预览数据按钮,左侧列表展示当前任务相关数据源的输入数据情况及抽取状态,显示绿色时,即表示抽取成功。 若后续需替换测试数据,您也可单击测试数据详情右侧的数据配置按钮,重新进行数据上传或构造。

    Image

  2. 调试配置
    在调试配置界面,您需进行任务的运行模式输入数据选择。

    参数

    说明

    输入数据

    输入数据支持选择测试数据线上数据类型:

    • 线上数据:默认会选择使用线上数据,即实际线上环境中的数据,或Datagen中生成的随机数。
    • 测试数据:下拉选择已构建成功的测试数据,您也可单击右侧构建数据按钮,前往构建数据页签进行构建。

    调试配置完成后,单击开始调试按钮,即开始调试运行,在下方的测试记录窗口,查看任务运行的概览、结果、日志等信息。

  3. 调试成功后,单击保存图标,完成草稿保存。

DDL参数设置

DataLeap的流式SQL在DDL部分与开源版本有少量不同,具体参考以下内容。

  • 表 Kafka DDL 参数说明

    name

    meaning

    required

    default

    consumer/producer

    connector.type

    Connector的类型,必须是 kafka

    YES

    -

    consumer & producer

    connector.version

    kafka 版本,当前支持 '0.10'

    YES

    -

    consumer & producer

    connector.cluster

    kafka 集群名

    YES

    -

    consumer & producer

    connector.topic

    kafka topic

    YES

    -

    consumer & producer

    connector.group.id

    consumer group id

    YES

    -

    consumer

    update-mode

    更新模式, 'append'/'upsert'

    YES

    -

    consumer 就填 'append' 就行; producer 如果是查询的结果是可以更新的就用 upsert, 如果是查询的结果是不可更新的就用 append.

    connector.owner

    作业 owner

    NO

    -

    consumer & producer

    connector.startup-mode

    earliest-offset/latest-offset/group-offsets/specific-timestamp,详细解释如下

    NO

    group-offsets

    consumer

    connector.specific-timestamp

    当指定connector.startup-mode为specific-timestamp的时候,需要指定一个ms单位的时间戳,从该时间戳开始消费

    NO

    0

    consumer

    connector.reset-to-earliest-for-new-partition

    该参数表示在任务启动时,如果用的是group-offsets配置,对于那些还没有offset的partition如何处理,详情可以参考:kafka partition扩容

    NO

    true

    consumer

    connector.kafka.properties.{param}

    {param} 是任意 kafka 参数, 如 connector.kafka.properties.ignore.dc.check = true

    NO

    -

    consumer & producer

    connector.log-failures-only

    写入 kafka 失败时是否只打一条日志,task 不退出

    NO

    false

    producer

    connector.rate-limiting-num

    读取 kafka 限速,是整个 source table 所有并发的流速之和, 配合 connector.rate-limiting-unit 使用。

    NO

    -1,默认不限速

    consumer

    connector.rate-limiting-unit

    读取 kafka 限速的单位,配合 connector.rate-limiting-num 使用,可选单位:'BYTE', 'RECORD' 分别对应 byte 和 消息条数

    NO

    'BYTE'

    consumer

  • 表 MySQL DDL 参数说明

    name

    meaning

    required

    取值范围

    default

    connector.type

    connector type

    YES

    jdbc

    -

    connector.dbname

    数据库名

    YES

    -

    -

    connector.table

    数据库表名

    YES

    -

    -

    connector.init-sql

    新建 db 连接时预执行的语句。 如需要写入表情符号时,设置 'connector.init-sql' = 'SET NAMES utf8mb4'

    NO

    -

    -

    connector.write.flush.max-rows

    积攒多少条数据一批次写入

    NO

    0~

    5000

    connector.write.flush.interval

    积攒多久时间的数据一批次写入, 单位 ms, 默认是 0 的话,就是没有定期输出,而不是每来一条就输出。

    NO

    0~

    0

    connector.write.max-retries

    写入数据库失败的情况下,最多尝试多少次

    NO

    0~

    3

Kafka json Format

使用 json format 需在 create table 语句中通过'format.type'='json' 指定。
注意事项:
json类型根据用户在create table中声明的column name和column type做解析。SQL任务目前仅做部分自动适配,例如声明为int类型,但实际json里面是int的string表示,SQL也能自动识别并转换回int。但是如果例如声明为int但实际为long或者声明为int但实际是带有字母的string等,SQL无法直接转换,会报错。
json类型支持嵌套json。例如嵌套json "{"a": "a", "b": {"c": 1, "d": "s"}}", 声明嵌套结构时,需要声明为b Row<c int, d varchar> 即可表明字段c、d是嵌套于字段b中。json也支持array形式。

  1. 如果是简单的array,例如"a": [1,2,3,4],则可声明为a Array,即表明a是一个int类型的array,注意,select 的时候,下标是从 1 开始的。例如"a": [6,7,8,9], select a[1] 会返回 6。

  2. 如果是嵌套json的array,例如"a": [{"b": 3}, {"b": 6}],则可声明为a Array, 即表明a是一个object类型array,a的结构中,有一个类型为int的b列。获取的方式如 select a[1].b 会返回3.
    表 Kafka json Format参数说明

    name

    meaning

    required

    default

    note

    format.type

    format 的类型,必须是 'json'

    YES

    -

    -

    format.derive-schema

    json schema 指定方式之一,即自动按table 的schema 推断。强烈建议用这个

    No

    true

    样例: 'format.derive-schema'='true'

    format.schema

    json schema 指定方式之一,指定 type info, 不建议用这个,建议用 format.derive-schema

    No

    -

    样例: 'format.schema'='ROW<test1 VARCHAR, test2 TIMESTAMP>'

    format.json-schema

    json schema 指定方式之一 不建议用这个,建议用 format.derive-schema

    No

    -

    样例: 'format.schema'='{'title': 'Person', 'properties': {'firstName': {'type': 'string'}}}

    format.fail-on-missing-field

    缺少字段的时候是否直接失败

    No

    false

    -

    format.default-on-missing-field

    缺少字段的时候是否自动添加默认值

    No

    false

    -

    format.skip-dirty

    跳过脏数据

    No

    false

    -

    format.skip-interval-ms

    脏数据打印间隔(默认是10s)

    No

    10000

    -

    format.json-parser-feature.{feature}

    jackson 的 JsonParser.Feature 相关配置

    No

    -

    样例:'format.json-parser-feature.ALLOW_UNQUOTED_CONTROL_CHARS'='true'

    format.enforce-utf-encoding

    是否强制编码为utf8编码(默认非BMP类型的unicode会以转义的方式来处理

    NO

    false

    -

    format.filter-null-values

    是否把null字段不写进json中

    NO

    false

    -

    format.bytes-as-json-node

    是否把json中byte字段当成any类型.

    NO

    false

Kafka pb format

使用 pb format 需在 create table 语句中通过'format.type'='pb' 指定。参数说明如下:
表 Kafka pb format 参数说明

name

meaning

required

default

note

format.type

format 的类型,必须是 'pb'

YES

-

-

format.pb-class

指定 pb class

YES

-

样例: 'format.pb-class' = 'parser.proto.ProtoParser$Instance'

format.pb-skip-bytes

解析 pb bytes 的时候忽略前几个bytes,这个是 AML 的特殊需求,普通用户请忽略

No

0

样例:'format.pb-skip-bytes' = '8'

format.pb-sink-with-size-header

true/false. 往外 sink pb bytes 的时候,是否在前面加上 8 字节的 pb bytes size。这个是 AML 的特殊需求,普通用户请忽略

No

false

样例:'format.pb-sink-with-size-header' = 'true'

format.ignore-parse-errors

是否忽略解析错误的数据

NO

false

-

pb 中类型和 sql 中类型的映射关系如下表所示。

types in pb

sql type

note

repeated

ARRAY

如:repeated int32 -> ARRAY

MAP

MAP

如:map<string, int32> -> MAP<varchar, int>

enum

varchar

-

one of

-

会将 oneof 字段直接解到上一层。

其他复杂类型

Row

-

double

double

-

float

float

-

int32

int

-

uint32

int

-

uint64

bigint

-

sint32

int

-

sint64

bigint

-

fixed32

int

-

fixed64

bigint

-

sfixed32

int

-

sfixed64

bigint

-

bool

boolean

-

string

varchar

-

bytes

binary

-

最近更新时间:2025.12.04 17:55:52
这个页面对您有帮助吗?
有用
有用
无用
无用