You need to enable JavaScript to run this app.
导航

Doris

最近更新时间2023.09.12 16:22:50

首次发布时间2023.05.10 11:22:34

Doris 连接器提供了 Doris 数据库的读写数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Doris Table 中获取数据,作为作业的输入数据;也可以通过 Doris 结果表将作业输出数据写入到 Doris Table 中。

DDL 定义

用作数据源(Source)

CREATE TABLE doris_source (
   name   STRING,
   score   INT
)  WITH (
   'connector' = 'doris',
   'fenodes' = 'FE_IP:FE_HTTP_PORT',     
   'table.identifier' = 'test.sales_order',  
   'username' = 'root',                     
   'password' = 'password'                          
 );

用作数据目的(Sink)

CREATE TABLE doris_sink (
   name   STRING,
   score   INT
)  WITH (
   'connector' = 'doris',
   'fenodes' = 'FE_IP:FE_HTTP_PORT',     
   'table.identifier' = 'test.sales_order',  
   'username' = 'root',                     
   'password' = 'password',                 
   'sink.batch.size' = '500',                
   'sink.batch.interval' = '1s'              
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Doris 连接器。

fenodes

(none)

String

Doris FE 的 HTTP 地址,格式为FE_IP:FE_HTTP_PORT,PORT 固定为 8030 端口, FE_IP 可以是部署 Doris 的任意一个 Node 节点 IP。

table.identifier

(none)

String

表名,格式为db.table

username

(none)

String

登录 Doris 数据库的用户名。

password

(none)

String

登录 Doris 数据库的用户密码。

doris.request.retries

3

Integer

向 Doris 发送请求的重试次数。

doris.request.connect.timeout.ms

30000

Duration

向 Doris 发送请求的连接超时时间。

doris.request.read.timeout.ms

30000

Duration

向 Doris 发送请求的读取超时时间。

doris.request.query.timeout.s

3600

Duration

查询 Doris 的超时时间。设置为-1表示无超时限制。

doris.request.tablet.size

Integer. MAX_VALUE

Integer

一个分区对应的 Doris Tablet 个数。
取值越小,分区越多,能提升 Flink 作业并行度。

doris.batch.size

1024

Integer

单次从 BE 读取数据的最大行数。
增大此数值,可减少 Flink 与 Doris 的连接次数,从而降低网络延迟。

doris.exec.mem.limit

2147483648

Long

单次查询的内存限制。默认为 2GB,单位为 byte。

doris.deserialize.arrow.async

false

Boolean

是否支持异步转换 。

  • true:支持。
  • false:不支持。

doris.deserialize.queue.size

64

Integer

异步转换 Arrow 格式的内部处理队列。

doris.read.field

(none)

String

读取 Doris 表的列名,多列之间使用英文逗号分隔。

doris.filter.query

(none)

String

过滤读取数据的表达式。

sink.batch.size

1000

Integer

单次写 BE 的最大行数。

sink.max-retries

1

Integer

写 BE 失败后的最大重试次数。

sink.batch.interval

1s

Duration

Flush 间隔时间,超过该时间后异步线程将缓存数据写入 BE。
默认值为 1 秒,支持时间单位 ms、s、min、h 和 d。设置为 0 时,表示关闭定期写入。

示例代码

  • 结果表
    CREATE TABLE datagen_source (
         siteid INT,
         citycode SMALLINT,
         username STRING,
         pv BIGINT
         )
    WITH (
        'connector' = 'datagen',
        'rows-per-second'='1',    
        'fields.username.length' = '3',
        'fields.siteid.min' = '6',                   
        'fields.siteid.max' = '1000',  
        'fields.citycode.min' = '1',
        'fields.citycode.max' = '100',
        'fields.pv.min' = '1',
        'fields.pv.max' = '1000'
    );
    CREATE TABLE doris_sink (
        siteid INT,
        citycode SMALLINT,
        username STRING,
        pv BIGINT
        ) 
        WITH (
            'connector' = 'doris',
            'fenodes' = 'FE_IP:FE_HTTP_PORT',     
            'table.identifier' = 'DOC.table2',  
            'username' = 'root',                     
            'password' = 'password'                            
    );
    insert into doris_sink
    select * from datagen_source;