You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
Doris
复制全文
下载 pdf
Doris

Doris 连接器提供了 Doris 数据库的读写数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Doris Table 中获取数据,作为作业的输入数据;也可以通过 Doris 结果表将作业输出数据写入到 Doris Table 中。

DDL 定义

用作数据源(Source)

CREATE TABLE doris_source (
   name   STRING,
   score   INT
)  WITH (
   'connector' = 'doris',
   'fenodes' = 'FE_IP:FE_HTTP_PORT',     
   'table.identifier' = 'test.sales_order',  
   'username' = 'root',                     
   'password' = 'password'                          
 );

用作数据目的(Sink)

CREATE TABLE doris_sink (
   name   STRING,
   score   INT
)  WITH (
   'connector' = 'doris',
   'fenodes' = 'FE_IP:FE_HTTP_PORT',     
   'table.identifier' = 'test.sales_order',  
   'username' = 'root',                     
   'password' = 'password',                 
   'sink.buffer-flush.max-rows' = '500',                
   'sink.buffer-flush.interval' = '1s'              
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Doris 连接器。

fenodes

(none)

String

Doris FE 的 HTTP 地址,格式为FE_IP:FE_HTTP_PORT, FE_IP 可以是部署 Doris 的任意一个 Node 节点 IP,FE_HTTP_PORT 一般采用 Doris 中默认的 8030 端口。

table.identifier

(none)

String

表名,格式为db.table

username

(none)

String

登录 Doris 数据库的用户名。

password

(none)

String

登录 Doris 数据库的用户密码。

doris.request.retries

3

Integer

向 Doris 发送请求的重试次数。

doris.request.connect.timeout.ms

30000

Duration

向 Doris 发送请求的连接超时时间。

doris.request.read.timeout.ms

30000

Duration

向 Doris 发送请求的读取超时时间。

doris.request.query.timeout.s

3600

Duration

查询 Doris 的超时时间。设置为-1表示无超时限制。

doris.request.tablet.size

Integer. MAX_VALUE

Integer

一个分区对应的 Doris Tablet 个数。
取值越小,分区越多,能提升 Flink 作业并行度。

doris.batch.size

1024

Integer

单次从 BE 读取数据的最大行数。
增大此数值,可减少 Flink 与 Doris 的连接次数,从而降低网络延迟。

doris.exec.mem.limit

2147483648

Long

单次查询的内存限制。默认为 2GB,单位为 byte。

doris.deserialize.arrow.async

false

Boolean

是否支持异步转换 。

  • true:支持。
  • false:不支持。

doris.deserialize.queue.size

64

Integer

异步转换 Arrow 格式的内部处理队列。

doris.read.field

(none)

String

读取 Doris 表的列名,多列之间使用英文逗号分隔。

doris.filter.query

(none)

String

过滤读取数据的表达式。

sink.max-retries

1

Integer

写 BE 失败后的最大重试次数。

sink.buffer-flush.max-rows

1000

Integer

单次写 BE 的最大行数。

sink.buffer-flush.interval

10s

Duration

Flush 间隔时间,超过该时间后异步线程将缓存数据写入 BE。
默认值为 10 秒,支持时间单位 ms、s、min、h 和 d。设置为 0 时,表示关闭定期写入。

示例代码

  • 结果表
    CREATE TABLE datagen_source (
         siteid INT,
         citycode SMALLINT,
         username STRING,
         pv BIGINT
         )
    WITH (
        'connector' = 'datagen',
        'rows-per-second'='1',    
        'fields.username.length' = '3',
        'fields.siteid.min' = '6',                   
        'fields.siteid.max' = '1000',  
        'fields.citycode.min' = '1',
        'fields.citycode.max' = '100',
        'fields.pv.min' = '1',
        'fields.pv.max' = '1000'
    );
    CREATE TABLE doris_sink (
        siteid INT,
        citycode SMALLINT,
        username STRING,
        pv BIGINT
        ) 
        WITH (
            'connector' = 'doris',
            'fenodes' = 'FE_IP:FE_HTTP_PORT',     
            'table.identifier' = 'DOC.table2',  
            'username' = 'root',                     
            'password' = 'password'                            
    );
    insert into doris_sink
    select * from datagen_source;
    
最近更新时间:2025.11.25 16:05:22
这个页面对您有帮助吗?
有用
有用
无用
无用