You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Connector 参考
JDBC
复制全文
JDBC

JDBC 连接器提供了对 MySQL、PostgreSQL 等常见的关系型数据库的读写能力,支持做数据源表、结果表和维表。

注意事项

如果您需要使用 JDBC 连接器连接云数据库 veDB MySQL 版,您的连接终端请按照以下要求配置,否则可能会因为自定义连接终端的限制而出现任务故障。
如需详细了解各参数含义,请参见编辑连接终端

  • 读写模式:配置为读写
  • 一致性级别:配置为最终一致性
  • 主节点接受读:关闭该选项。
  • 事务拆分:打开该选项。
  • 基于合规要求,如果需要使用 JDBC MySQL 相关功能,需要用户自行上传 MySQL Driver 。请前往 MySQL 官网下载 8.0.27 版本,并且上传到作业开发依赖文件中。具体操作可以参考 使用 JDBC 或者 MySQL-CDC 数据源

DDL 定义

用作数据源(Source)

CREATE TABLE jdbc_source (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',  
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'Passord',      
    'scan.partition.column' = 'score',       
    'scan.partition.num' = '2',           
    'scan.partition.lower-bound' = '0',     
    'scan.partition.upper-bound' = '100',     
    'scan.fetch-size' = '1' 
);

用作数据目的(Sink)

CREATE TABLE jdbc_sink (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',     
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'Pa$$w0rd',      
    'sink.buffer-flush.max-rows' = '100'
);

用作数据维表(Lookup)

CREATE TABLE jdbc_lookup (
    name String,
    score INT
 ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db',     
    'table-name' = '<yourtablename>',
    'username' = 'admin',          
    'password' = 'MyPa$$w0rd',      
    'lookup.cache.max-rows' = '100',
    'lookup.max-retries' = '3'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 JDBC 连接器。

url

(none)

String

JDBC 数据库的连接地址。

table-name

(none)

String

表名。

driver

(none)

String

JDBC Driver 的类名。如果不设置,则自动从 url 参数值中提取。

username

(none)

String

数据库登录用户名和密码。使用时,两者必须同时使用。

password

(none)

String

数据库登录用户名和密码。使用时,两者必须同时使用。

scan.partition.column

(none)

String

指定分区扫描(Partitioned Scan)的列名,该列必须是数值类型、日期类型、时间戳类型等。

scan.partition.num

(none)

Integer

分区扫描启用后,指定分区数。

scan.partition.lower-bound

(none)

Integer

分区扫描启用后,指定首个分区的最小值。

scan.partition.upper-bound

(none)

Integer

分区扫描启用后,指定最后一个分区的最大值。

scan.fetch-size

0

Integer

读取数据时,批量读取的行数。
默认为 0,表示一行一行的读取,效率较低(吞吐量不高)。

sink.buffer-flush.max-rows

100

Integer

批量输出时,缓存中最多缓存多少数据。如果设置为 0,表示禁止输出缓存。

sink.buffer-flush.interval

1s

Duration

批量输出时,每批次最大的间隔,单位毫秒。
如果设置了静止输出缓存,但批量输出间隔大于 0s,则说明启用纯异步输出功能,即数据输出到算子、从算子最终写入数据库。

sink.max-retries

3

Integer

数据写入失败时,最大重试次数。

lookup.cache.max-rows

(none)

Integer

查询缓存(Lookup Cache)中最多缓存的数据条数。需要搭配 lookup.cache.ttl 使用。

lookup.cache.ttl

(none)

Duration

查询缓存中每条记录最长的缓存时间。需要搭配 lookup.cache.max-rows 使用。

lookup.max-retries

3

Integer

数据库查询失败时,最多重试的次数。

lookup.cache

NONE

String

维表的 cache 策略,可选:

  • FULL:全表缓存,适用于小表
  • PARTIAL:部分缓存
  • NONE:不缓存

lookup.full-cache.reload-strategy

PERIODIC

String

定义使用哪种策略来重新加载 Full Cache 缓存:

  • PERIODIC:缓存以固定的时间间隔重新加载,没有初始延迟
  • TIMED:缓存会在指定的时间以固定的时间间隔重新加载,间隔是一天的倍数

lookup.full-cache.periodic-reload.interval

(none)

Duration

采用 PERIODIC 加载策略下的 Full Cache 重新加载间隔。

lookup.full-cache.periodic-reload.schedule-mode

FIXED_DELAY

String

定义在 PERIODIC 加载策略下的调度模式:
FIXED_DELAY*:重新加载间隔是上一次重新加载结束和下一次重新加载开始之间的时间。
FIXED_RATE
:*重新加载间隔是上一次重新加载开始和下一次重新加载开始之间的时间。

lookup.full-cache.timed-reload.iso-time

(none)

String

在 TIMED 加载策略下的 Full Cache 加载时间,时间采用 ISO-8601 格式。
当维表在指定时间每隔几天更新一次时,它会很有用。可以指定带时区或不带时区的时间(将使用目标 JVM 本地时区)。例如,“10:15”= 本地 TZ,“10:15Z”= UTC TZ,“10:15+07:00”= UTC +7 TZ。

lookup.full-cache.timed-reload.interval-in-days

1

Integer

在 TIMED 加载策略下的 Full Cache 加载间隔,间隔是一天的倍数,设置为 1 则表示间隔 1 天加载一次,设置为 2 则表示间隔 2 天加载一次。

示例代码

  • 结果表。

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_update_time as localtimestamp
        )
    WITH (
         'connector' = 'datagen',
         'rows-per-second' = '5'            
    );
    
    
    CREATE TABLE jdbc_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 
        'username' = 'admin',      
        'password' = '***',  
        'table-name' = 'orders'
    );
    
    
    insert into jdbc_sink
    select * from datagen_source; 
    
  • 源表。

    CREATE TABLE jdbc_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp,
         PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 
        'username' = 'admin',      
        'password' = 'Passord',  
        'table-name' = 'source_table'
    );
    
     CREATE TABLE print_sink (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time timestamp
        )
    WITH (
         'connector' = 'print'           
    );
    
    insert into print_sink
    select * from jdbc_source; 
    

FAQ

部分列更新

JDBC Sink 支持部分列更新,只需要在 DDL 定义的时候,定义需要更新的列即可。
比如一张表有 id,a,b,c 四个列,如果只对 a 做部分列更新,则 DDL 只需要定义 id 和 a 两个字段。

CREATE TABLE jdbc_sink ( 
     `id` bigint, 
     a string
     PRIMARY KEY (`id`) NOT ENFORCED 
 ) WITH ( 
    'connector' = 'jdbc', 
    'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db',  
    'username' = 'admin',       
    'password' = '***',   
    'table-name' = 'test' 
);
最近更新时间:2025.03.03 09:41:50
这个页面对您有帮助吗?
有用
有用
无用
无用