You need to enable JavaScript to run this app.
导航

JDBC

最近更新时间2023.09.12 16:22:50

首次发布时间2022.09.08 17:27:42

JDBC 连接器提供了对 MySQL、PostgreSQL 等常见的关系型数据库的读写能力,支持做数据源表、结果表和维表。

DDL 定义

用作数据源(Source)

CREATE TABLE jdbc_source (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',  
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'Passord',      
    'scan.partition.column' = 'score',       
    'scan.partition.num' = '2',           
    'scan.partition.lower-bound' = '0',     
    'scan.partition.upper-bound' = '100',     
    'scan.fetch-size' = '1' 
);

用作数据目的(Sink)

CREATE TABLE jdbc_sink (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',     
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'Pa$$w0rd',      
    'sink.buffer-flush.max-rows' = '100'
);

用作数据维表(Lookup)

CREATE TABLE jdbc_lookup (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',     
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'MyPa$$w0rd',      
    'lookup.cache.max-rows' = '100',
    'lookup.max-retries' = '3'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 JDBC 连接器。

url

(none)

String

JDBC 数据库的连接地址。

table-name

(none)

String

表名。

driver

(none)

String

JDBC Driver 的类名。如果不设置,则自动从 url 参数值中提取。

username

(none)

String

数据库登录用户名和密码。使用时,两者必须同时使用。

password

(none)

String

数据库登录用户名和密码。使用时,两者必须同时使用。

scan.partition.column

(none)

String

指定分区扫描(Partitioned Scan)的列名,该列必须是数值类型、日期类型、时间戳类型等。

scan.partition.num

(none)

Integer

分区扫描启用后,指定分区数。

scan.partition.lower-bound

(none)

Integer

分区扫描启用后,指定首个分区的最小值。

scan.partition.upper-bound

(none)

Integer

分区扫描启用后,指定最后一个分区的最大值。

scan.fetch-size

0

Integer

读取数据时,批量读取的行数。
默认为0,表示一行一行的读取,效率较低(吞吐量不高)。

sink.buffer-flush.max-rows

100

Integer

批量输出时,缓存中最多缓存多少数据。如果设置为 0,表示禁止输出缓存。

sink.buffer-flush.interval

1s

Duration

批量输出时,每批次最大的间隔,单位毫秒。
如果设置了静止输出缓存,但批量输出间隔大于 0s,则说明启用纯异步输出功能,即数据输出到算子、从算子最终写入数据库。

sink.max-retries

3

Integer

数据写入失败时,最大重试次数。

lookup.cache.max-rows

(none)

Integer

查询缓存(Lookup Cache)中最多缓存的数据条数。

lookup.cache.ttl

(none)

Duration

查询缓存中每条记录最长的缓存时间。

lookup.max-retries

3

Integer

数据库查询失败时,最多重试的次数。

示例代码

  • 结果表

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'            
    );
    
    
    CREATE TABLE jdbc_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 
        'username' = 'admin',      
        'password' = '***',  
        'table-name' = 'orders'
    );
    
    
    insert into jdbc_sink
    select * from datagen_source; 
    
  • 源表

    CREATE TABLE jdbc_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 
        'username' = 'admin',      
        'password' = 'Passord',  
        'table-name' = 'source_table'
    );
    
     CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    
    insert into print_sink
    select * from jdbc_source;