最近更新时间:2023.09.12 16:22:50
首次发布时间:2022.09.08 17:27:42
JDBC 连接器提供了对 MySQL、PostgreSQL 等常见的关系型数据库的读写能力,支持做数据源表、结果表和维表。
CREATE TABLE jdbc_source ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'Passord', 'scan.partition.column' = 'score', 'scan.partition.num' = '2', 'scan.partition.lower-bound' = '0', 'scan.partition.upper-bound' = '100', 'scan.fetch-size' = '1' );
CREATE TABLE jdbc_sink ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'Pa$$w0rd', 'sink.buffer-flush.max-rows' = '100' );
CREATE TABLE jdbc_lookup ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'MyPa$$w0rd', 'lookup.cache.max-rows' = '100', 'lookup.max-retries' = '3' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 JDBC 连接器。 |
url | 是 | (none) | String | JDBC 数据库的连接地址。 |
table-name | 是 | (none) | String | 表名。 |
driver | 否 | (none) | String | JDBC Driver 的类名。如果不设置,则自动从 url 参数值中提取。 |
username | 否 | (none) | String | 数据库登录用户名和密码。使用时,两者必须同时使用。 |
password | 否 | (none) | String | 数据库登录用户名和密码。使用时,两者必须同时使用。 |
scan.partition.column | 否 | (none) | String | 指定分区扫描(Partitioned Scan)的列名,该列必须是数值类型、日期类型、时间戳类型等。 |
scan.partition.num | 否 | (none) | Integer | 分区扫描启用后,指定分区数。 |
scan.partition.lower-bound | 否 | (none) | Integer | 分区扫描启用后,指定首个分区的最小值。 |
scan.partition.upper-bound | 否 | (none) | Integer | 分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size | 否 | 0 | Integer | 读取数据时,批量读取的行数。 |
sink.buffer-flush.max-rows | 否 | 100 | Integer | 批量输出时,缓存中最多缓存多少数据。如果设置为 0,表示禁止输出缓存。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 批量输出时,每批次最大的间隔,单位毫秒。 |
sink.max-retries | 否 | 3 | Integer | 数据写入失败时,最大重试次数。 |
lookup.cache.max-rows | 否 | (none) | Integer | 查询缓存(Lookup Cache)中最多缓存的数据条数。 |
lookup.cache.ttl | 否 | (none) | Duration | 查询缓存中每条记录最长的缓存时间。 |
lookup.max-retries | 否 | 3 | Integer | 数据库查询失败时,最多重试的次数。 |
结果表
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE jdbc_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 'username' = 'admin', 'password' = '***', 'table-name' = 'orders' ); insert into jdbc_sink select * from datagen_source;
源表
CREATE TABLE jdbc_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 'username' = 'admin', 'password' = 'Passord', 'table-name' = 'source_table' ); CREATE TABLE print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from jdbc_source;