You need to enable JavaScript to run this app.
导航

配置 PostgreSQL 数据源

最近更新时间2024.03.22 11:28:54

首次发布时间2023.03.17 16:07:31

PostgreSQL 是一种常用的关系型数据库,数据集成同步任务为您提供读取和写入 PostgreSQL 数据源的双向通道能力。
本文为您介绍 DataSail 的 PostgreSQL 数据源配置、同步任务可视化和脚本模式(DSL)配置能力,实现与不同数据源的数据互通能力。

1 支持的 PostgreSQL 版本

  • 离线读写
    目前支持读写的 PostgreSQL 可选版本为 PostgreSQL 10、11、12、13、14。
    您可以在数据库中执行以下语句,查看 PostgreSQL 数据库的版本:
    show server_version
    

2 使用前提

  1. 子账号新建数据源时,需要有项目的管理员角色,方可以进行新建数据源操作。各角色对应权限说明,详见:管理成员
  2. 确保集成同步任务使用的独享数据集成资源组,具有 PostgreSQL 数据库节点的网络访问能力。网络互通方案详见网络连通解决方案
    • 数据源为 RDS 云数据库实例时,需要将集成资源组所在 VPC 中的 IPv4 CIDR 地址添加到 PostgreSQL 访问白名单中:
      1. 确认集成资源组所在的 VPC:
        图片
      2. 查看 VPC 的 IPv4 CIDR 地址:

        注意

        若考虑安全因素,减少 IP CIDR 的访问范围,您至少需要将集成资源组绑定的子网下的 IPv4 CIDR 地址加入到数据库白名单中。

        图片
      3. 将获取到的 IPv4 CIDR 地址添加进 PostgreSQL 数据库白名单中,添加操作详见创建白名单
        图片
    • 数据源为公网自建数据源,需通过公网形式访问:
      1. 集成资源组开通公网访问能力,操作详见开通公网
      2. 并将公网 IP 地址,添加进 PostgreSQL 数据库白名单中。

3 支持的字段类型

当前主要字段支持情况如下

字段类型

离线读(PostgreSQL Reader)

离线写(PostgreSQL Writer)

char

支持

支持

bpchar

支持

支持

varchar

支持

支持

text

支持

支持

character varying

支持

支持

character

支持

支持

smallint

支持

支持

int2

支持

支持

integer

支持

支持

int

支持

支持

int4

支持

支持

bigint

支持

支持

int8

支持

支持

smallserial

支持

支持

serial

支持

支持

bigserial

支持

支持

double

支持

支持

float8

支持

支持

money

支持

支持

double precision

支持

支持

numeric

支持

支持

decimal

支持

支持

real

支持

支持

float4

支持

支持

boolean

支持

支持

bool

支持

支持

date

支持

支持

time

支持

支持

timetz

支持

支持

timestamp

支持

支持

timestamptz

支持

支持

bytea

支持

支持

bit

支持

支持

bit varying

支持

支持

varbit

支持

支持

uuid

支持

支持

cidr

支持

支持

xml

支持

支持

inet

支持

支持

macaddr

支持

支持

enum

支持

支持

json

支持

支持

jsonb

支持

支持

aclitem

支持

支持

_aclitem

支持

支持

_int2

支持

支持

_int4

支持

支持

_float4

支持

支持

_text

支持

支持

_char

支持

支持

cid

支持

支持

inet

支持

支持

int2vector

支持

支持

interval

支持

支持

oid

支持

支持

_oid

支持

支持

pg_node_tree

支持

支持

box

支持

支持

line

支持

支持

lseg

支持

支持

tsquery

支持

支持

tsvector

支持

支持

polygon

支持

支持

circle

支持

支持

point

支持

支持

path

支持

支持

gemotry

支持

不支持

4 数据同步任务开发

4.1 数据源注册

新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 PostgreSQL 数据源信息:

注意

PostgreSQL 侧如果是白名单访问机制,则不同网络环境的连接串地址,需要添加不同的 IP 地址到数据库白名单中,确保集成资源组使用的 VPC 与 PostgreSQL 网络能互通:

  • 如果使用的是公网连接串访问,则需要给集成资源组添加公网 IP,并将公网 IP 地址加入到白名单中。
  • 如果使用的是私网连接串访问,则需要将资源组 VPC 下的 IPv4 CIDR 地址加入到白名单中。

详见网络连通解决方案

参数

说明

基本配置

数据源类型

PostgreSQL

接入方式

连接串

数据源名称

数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。

参数配置

Database

输入已创建成功的 PostgreSQL 数据库名称。

SSL 模式

SSL 模式可以提供窃听攻击、中间人攻击(Man-in-the-middle MITM)、假冒攻击的保护措施。不同的 SSL 模式用于提供不同等级的保护,支持以下四种模式配置:

  • Disable:不使用 SSL 模式,适用于私网访问,不会有加解密的性能损耗。
  • allow:允许使用 SSL 模式,只有 Server 端一定需要使用 SSL 通信时,才会使用 SSL 通信,否则不使用 SSL 通信。
  • perfer:倾向于使用 SSL 模式,只要 Server 端支持 SSL ,client 端与 Server 端就会使用 SSL 通信。
  • require:需要使用 SSL 模式,适用于公网访问,会有加解密的性能的损耗。

具体说明详见 PostgreSQL SSL 官方文档。

Host

输入连接数据库时,使用的主机名或 IP 地址。

Port

PostgreSQL 数据库连接的端口号。

user

有权限访问数据库的用户名信息。

Password

输入用户名对应的密码信息。

4.2 新建任务

PostgreSQL 数据源测试连通性成功后,进入到数据开发界面,开始新建 PostgreSQL 相关通道任务。
新建任务方式详见离线数据同步流式数据同步

4.3 可视化配置说明

任务创建成功后,您可根据实际场景,配置PostgreSQL 批式读、PostgreSQL 批式写或 PostgreSQL 流式写等通道任务。

4.3.1 PostgreSQL 批式读

图片
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*数据源类型

下拉选择 PostgreSQL 数据源类型。

*数据源名称

已在数据源管理中注册成功的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

选择需要采集的数据表名称信息,目前单个任务只支持将单表的数据采集到一个目标表中。

数据过滤

支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,可直接填写关键词 where 后的过滤 SQL 语句,例如:create_time > '${date}',表示只同步 create_time 大于等于 ${date} 的数据,不需要填写 where 关键字。
语句填写完成后,您可单击右侧的校验按钮,进行过滤语句校验。

说明

该过滤语句通常用作增量同步,暂时不支持 limit 关键字过滤,其 SQL 语法需要和选择的数据源类型对应。
如果不配置,默认会同步全量数据。

切分建

根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键:

  • 如果表没有主键或者索引列,可以不配置该字段,同步任务不会进行分片,并以单并发的方式同步所有的数据;
  • 建议使用主键或有索引的列作为切分键,切分键配置没有索引的列同步任务会比较慢;

说明

目前仅支持类型为整型或字符串的字段作为切分建。

4.3.2 PostgreSQL 批式写

图片
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 PostgreSQL。

*数据源名称

已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

数据源下所属需数据写入的表名,下拉可选。

写入前准备语句

在执行该数据集成任务前,需要率先执行的 SQL 语句,通常是为了使任务重跑时支持幂等。
例如您可以通过填写语句,清空表中的某些旧数据,清空完成后,再执行集成任务写入新的数据。如删除 date='${date}' 的数据:delete from table_name where date='${date}'
语句填写完成后,您可单击右侧的校验按钮,进行语句校验是否符合逻辑。

说明

可视化通道任务配置中只允许执行一条写入前准备语句。

写入后准备语句

执行数据同步任务之后执行的 SQL 语句。例如写入完成后插入某条特殊的数据,标志导入任务执行结束。
语句填写完成后,您可单击右侧的校验按钮,进行语句校验是否符合逻辑。

说明

可视化通道任务配置中只允许执行一条写入后准备语句。

*数据写入方式

下拉选择数据写入 PostgreSQL 的方式:

  • insert into: 当主键/唯一性索引冲突时会无法写入冲突的行,任务会运行失败。

说明

如果希望主键/唯一索引冲突时任务正常执行可以添加高级参数: job.writer.is_insert_ignoretrue

4.3.3 PostgreSQL 流式写

支持可视化方式配置流式写入 PostgreSQL 单表。PostgreSQL Writer 通过 JDBC 远程连接 PostgreSQL 数据库,并执行相应的 SQL 语句,将数据写入 PostgreSQL。流式写入 PostgreSQL 配置方式如下:
图片
数据目标端选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 PostgreSQL。

*数据源名称

已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

数据源下所属需数据写入的表名,下拉可选。

4.3.4 字段映射

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
您可通过以下三种方式操作字段映射关系:

  • 自动添加:单击自动添加按钮,根据两端数据表信息,可以自动填充来源和目标的字段信息。
  • 手动添加:单击手动添加按钮,可以手动编辑来源和目标的字段信息,可以逐个添加。

    说明

    来源端字段信息支持输入数据库函数和常量配置,单击手动添加按钮,在源表字段中输入需添加的值,并选择函数或常量类型,例如:

    • 函数:支持您输入 now()、current_timestamp() 等 PostgreSQL 数据库支持的函数。
    • 常量:您可自定义输入常量值,'123'、'${DATE}'、'${hour}' 等,输入值两侧需要加上英文单引号,支持结合时间变量参数使用。
  • 移动\删除字段:您也可以根据需要移动字段映射顺序或删除字段。

4.4 DSL 配置说明

PostgreSQL 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 PostgreSQL Reader 和 PostgreSQL Writer 参数脚本代码,来运行数据集成任务。

4.4.1 进入 DSL 模式

进入 DSL 模式操作流程,可详见 MySQL 数据源-4.4.1 进入DSL 模式

4.4.2 PostgreSQL 批式读

进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,PostgreSQL 批式读脚本示例如下:

// 变量使用规则如下:
// 1.自定义参数变量: {{}}, 比如{{number}}
// 2.系统时间变量${}, 比如 ${date}、${hour}
// **************************************
{
    // [required] dsl version, suggest to use latest version
    "version": "0.2",
    // [required] execution mode, supoort streaming / batch now
    "type": "batch",
    // reader config
    "reader": {
        // [required] datasource type
        "type": "pg",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": 12345,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            "columns": [
                {
                    "name": "name_sample",
                    "type": "type_sample"
                }
            ],
            "filter": "id > 10",
            "split_pk": "split_pk_sample",
            "table_schema":"table_schema",
            "table_name": "table_name_sample"
        }
    },
    // writer config
    "writer": {
    },
    // common config
    "common": {
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
        }
    }
}

Reader 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:

参数名

描述

默认值

*type

数据源类型,对于 PostgreSQL 类型,填写:pg

*datasource_id

注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。

*table_schema

填写 PostgreSQL 数据库中的 Schema 名称。

*table_name

需要同步的数据表名称,目前单个任务只支持将单表的数据采集到一个目标表中。

filter

同步数据的筛选条件,同步数据时只会同步符合过滤条件的数据,直接填写关键词 where 后的过滤 SQL 语句。

  • 如将过滤条件指定为:date>=${date} ,表示只同步 date 大于等于 ${date}。
  • 过滤条件可以有效地进行业务增量同步。如果不配置,默认会同步全量数据。

split_pk

根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键,同步任务会启动并发任务进行数据同步,提高同步速率:

  • 如果表没有主键或者索引列,可以不配置该字段,同步任务不会进行分片,并以单并发的方式同步所有的数据;
  • 建议使用主键或有索引的列作为切分键,切分键配置没有索引的列同步任务会比较慢;

说明

目前仅支持类型为整型或字符串的字段作为切分建。

*columns

所配置的表中,需要同步的列名集合,使用 JSON 的数组描述字段信息。

  • 支持列裁剪:列可以挑选部分列进行导出。
  • 支持列换序:列可以不按照表 Schema 信息顺序进行导出。
  • column 必须显示指定同步的列集合,不允许为空。
  • 支持函数、常量形式添加列:
    • 函数:PostgreSQL Reader 支持您输入 now()、current_timestamp() 等 PostgreSQL 数据库支持的函数。
    • 常量:PostgreSQL Reader 支持您自定义输入常量值,如 '123'、'${DATE}'、'${hour}' 等,输入值两侧需要加上英文单引号,支持结合时间变量参数使用。

4.4.3 PostgreSQL 批式写

根据实际情况替换 PostgreSQL 批式写相应参数,PostgreSQL 批式写脚本示例如下:

// **************************************
// 变量使用规则如下:
// 1.自定义参数变量: {{}}, 比如{{number}}
// 2.系统时间变量${}, 比如 ${date}、${hour}
// **************************************
{
    // [required] dsl version, suggest to use latest version
    "version": "0.2",
    // [required] execution mode, supoort streaming / batch now
    "type": "batch",
    // reader config
    "reader": {
        // [required] datasource type
        "type": "xx",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": null,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
          
        }
    },
    // writer config
    "writer": {
        // [required] datasource type
        "type": "pg",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": 12345,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            "table_schema":"table_schema",
            "table_name":"table_1",
            "pre_sql_list":[""],
            "post_sql_list":[""],
            "write_mode":"directlyInsert",
            "columns": [
                {
                    "name": "name_sample",
                    "type": "type_sample"
                }
            ]
        }
    },
    // common config
    "common": {
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
        }
    }
}

Writer 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:

参数名

描述

默认值

*type

数据源类型,对于 PostgreSQL 类型,填写:pg

*datasource_id

注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。

*table_schema

填写 PostgreSQL 数据库中的 Schema 名称。

*table_name

填写需要同步的数据表名称,一个数据集成任务只能同步数据到一张目标表。

pre_sql_list

写入前准备语句:在执行数据集成任务前,率先执行的 SQL 语句。此语句通常是为了使任务重跑时支持幂等。
例如执行前清空表中的某些旧数据,清空完成后,在执行集成任务写入新的数据,例如删除 date='${date}' 的数据:["delete from table_name where date='${date}'", "xxx"]

说明

DSL 模式支持配置多条写入前准备语句,多条语句之间用英文逗号分隔。

post_sql_list

写入后准备语句:执行数据同步任务后执行的 SQL 语句。例如数据写入完成后,插入某条特殊的数据,标志导入任务执行结束。
示例:["insert into table_name (col1,col2..) values(values1,values2)", "xxx"]

说明

DSL 模式支持配置多条写入后准备语句,多条语句之间用英文逗号分隔。

*write_mode

数据导入模式,支持 insert into 模式:
insert into:当主键/唯一性索引冲突时会写不进去冲突的行,任务会运行失败。

  • 使用该模式时,请将 write_mode 设置为 directlyInsert
  • 如果希望主键/唯一索引冲突时任务正常执行,可以在 writer.parameter 参数下添加高级参数 job.writer.is_insert_ignore:true

*columns

所配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。

  • 支持列裁剪:列可以挑选部分列进行导出。
  • 支持列换序:列可以不按照表 Schema 信息顺序进行导出。

注意

  • column 必须显示指定同步的列集合,不允许为空。
  • column 必须与导入的源端列集合对齐,不允许多列或少列。

4.5 高级参数说明

  • 对于可视化通道任务,读参数需要加上 job.reader. 前缀,写参数需要加上 job.writer. 前缀,如下图所示:
    图片
  • 对于 DSL 任务,读参数请配置到 reader.parameter 下,写参数请配置到 writer.parameter 下,直接输入参数名称和参数值。如下图所示:
    图片

4.5.1 PostgreSQL 批式读

参数名

描述

默认值

init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境

reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

shard_split_mode

分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate
  • 并发分片:根据表的最大最小值,将所有的数据按照并发数进行区间分片。
    • 该分片模式仅支持分片键为整型数据类型;
    • 配置方式:将该参数配置为 parallelism
  • 不分片:不进行分片,适用于没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk

准确分片

customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

4.5.2 PostgreSQL 批式写

参数名

描述

默认值

is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突

false

write_batch_interval

一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

write_retry_times

PostgreSQL 写入失败时重试次数。

3

retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒

write_batch_interval / 10