You need to enable JavaScript to run this app.
导航
配置 Mongo 数据源
最近更新时间:2024.08.16 11:17:49首次发布时间:2023.03.17 16:07:31

DataSail 数据集成中的 MongoDB 数据源为您提供读取和写入 MongoDB 的双向通道数据集成能力,实现不同数据源与 MongoDB 之间进行数据传输。
下文为您介绍 MongoDB 数据同步的能力支持情况。

1 支持的版本

MongoDB 使用的驱动版本是 mongo-java-driver 3.11.0,该驱动支持的内核版本为 3.4+ 版本。
驱动能力详情请参见Mongo官方文档

2 支持的字段类型

MongoDB 读写支持的字段类型:

类型

离线写入

离线读取

OBJECTID

支持

支持

LONG

支持

支持

STRING

支持

支持

INT

支持

支持

DECIMAL

支持

支持

NULL

支持

支持

DOUBLE

支持

支持

DATE

支持

支持

TIMESTAMP

支持

支持

BINDATA

支持

支持

BOOL

支持

支持

REGEX

支持

支持

JAVASCRIPT

支持

支持

UNDEFINED

支持

支持

JAVASCRIPTWITHSCOPE

支持

支持

ARRAY

支持

支持

3 数据同步任务开发

3.1 数据源注册

新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 MongoDB 数据源信息.
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

基本配置

*数据源类型

MongoDB

*接入方式

连接串

*数据源名称

数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。

参数配置

*主机名或 IP 地址

MongoDB 接入地址,格式为主机名: 端口,单击新增按钮,支持配置多个 Hosts。

鉴权数据库

身份认证所用库。

注意

Mongo 数据源用于实时整库解决方案时,建议配置 admin 鉴权数据库,对应用户通过 CDC 消费 Binlog,权限校验较高,需保证有 ListDatabases 和 ListConnections 权限。

*数据库名

创建 MongoDB 数据库时,数据库的名称。

*用户名

数据库登录账号名称。

*密码

数据库账号密码。

3.2 新建离线任务

MongoDB 数据源测试连通性成功后,进入到数据开发界面,开始新建 MongoDB 相关通道任务。
新建任务方式详见离线数据同步

3.3 可视化配置说明

任务创建成功后,您可根据实际场景,配置 MongoDB 离线读MongoDB 离线写等通道任务。

3.3.1 MongoDB 离线读

图片
数据来源选择 MongoDB,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*数据源类型

下拉选择 MongoDB 数据源类型。

*数据源名称

已在数据源管理中注册成功的 MongoDB 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 MongoDB 数据源。

*集合名

选择需要采集的数据集合名称,下拉可选,支持同时选择多个 Schema 相同的集合。
选择需要采集的数据集合名称,您可同时选择多个 Schema 相同的集合进行数据同步,支持区间表达式如:“table_[0-99]”、时间表达式如:“table_${date}”和正则表达式如:“table.*” 方式,来快速配置选择多个集合。

分库分表

MongoDB 支持分库分表形式读取,单击添加分库分表按钮,进行分库分表添加,在下拉框中选择分库数据源与具体分表名称信息,支持添加多个分库分表。

注意

配置分库分表,需要所有集合的 Schema 信息必须保持一致,否则任务会执行异常。

数据过滤

支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,例如:{createTime:{$gte:ISODate('2024-02-02T00:00:00.000+0800')}}、{stringField:{$eq:"hello"}}

说明

该过滤语句通常用作增量同步,来限制返回 MongoDB 的数据范围。

*主键名

MongoDB 中主键字段的名称信息。

3.3.2 MongoDB 离线写

图片
数据目标端选择 MongoDB,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 MongoDB。

*数据源名称

已在数据源管理界面注册的 MongoDB 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 MongoDB 数据源。

*集合名

对应数据源下,选择需要写入数据的集合名称,下拉可选。

*数据写入方式

写入模式指定了传输数据时是否覆盖的信息:

  • 覆盖式写入:写入时,不清除数据,唯一键相同,用新的数据覆盖旧数据,唯一键不同时,直接插入新数据。
  • 直接写入:不做其他操作,直接写入数据,表示不覆盖,出现唯一键冲突时,任务会执行失败。

*业务主键

当写入方式为覆盖写入时,需填写 MongoDB 集合的业务主键字段名称,支持配置多个业务主键,多个主键分隔符为逗号,配置形式为:a.b.c,a.d,k。

写入前准备语句

表示数据写入 MongoDB 前,对 MongoDB 中已有数据的前置操作,例如清理历史数据等,需确保输入语句符合 JSON 语法要求。

3.3.3 字段映射

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
字段映射支持选择基础模式转换模式配置映射:

注意

基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。

  • 转换模式:
    字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先通过数据转换后,以指定格式输入到目标端数据库中。
    转换模式详细操作说明详见4.1 转换模式
    在转换模式中,你可依次配置:来源节点、数据转换、目标节点信息:

    配置节点

    说明

    来源节点

    配置数据来源 Source 节点信息:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • 数据字段:通过自动添加、手动添加等方式添加数据来源字段信息。

    配置完成后,单击确认按钮,完成来源节点配置。

    数据转换

    单击数据转换右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • SQL 脚本:输入 SQL 脚本转换规则,目前仅支持添加一个转换的 SQL 语句,且不能包括 “;”。

    配置完成后,单击确认按钮,完成数据转换节点配置。SQL 脚本示例详见4.1.2 添加转换节点

    目标节点

    配置目标节点 Sink 信息:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • 数据字段:通过自动添加、手动添加等方式添加数据目标字段信息。

    配置完成后,单击确认按钮,完成目标节点配置。

  • 基础模式:

    您可通过以下三种方式操作字段映射关系:

    • 同名映射:单击同名映射按钮,根据目标端/源端数据表信息,可以自动填充源端/目标端的字段信息。
    • 自动添加:单击自动添加按钮,读端可根据数据集合 Schema 信息,自动填充来源的字段信息。
    • 手动添加:单击手动添加按钮,可以手动编辑来源和目标的字段信息,可以逐个添加。
    • 移动\删除字段:您也可以根据需要移动字段映射顺序或删除字段。

3.4 DSL 配置说明

Mongo 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,如:数据源和独享集成资源组网络存在跨 VPC,用云企业网 CEN 打通网络,但无法配置 Mongo 数据源时、或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 Mongo Reader 参数脚本代码,来运行数据集成任务。

3.4.1 进入 DSL 模式

进入 DSL 模式操作流程,可详见6 脚本模式配置

3.4.2 Mongo 批式读

进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,Mongo 批式读脚本示例如下:

{
    // [required] dsl version, suggest to use latest version
    "version": "0.2",
    // [required] execution mode, supoort streaming / batch now
    "type": "batch",
    // reader config
    "reader": {
        // [required] datasource type
        "type": "mongo",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": null,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            // 集合名称
            "collection_name":"test_xxx",
            //过滤条件,注意要使用 mongo 语法
            "filter":"{stringField:{$eq:\"hello\"}}",
            //主键名
            "split_pk":"_id",
            "self_built":true,
            "columns":[
                {
                    "upperCaseName":"_ID",
                    "name":"_id",
                    "type":"objectid"
                },
                {
                    "upperCaseName":"STRING_FIELD",
                    "name":"string_field",
                    "type":"string"
                },
                {
                    "upperCaseName":"INT_FIELD",
                    "name":"int_field",
                    "type":"int"
                }
            ],
            //用户名
            "user_name":"****",
            //密码
            "password":"*****",
            // 主机名或IP地址:
            "hosts_str":"xxx.mongodb.volces.com:3717,xxx.mongodb.volces.com:3717",
            //鉴权数据库
            "auth_db_name":"db_xxx",
            // 数据库名
            "db_name":"db_1_xxx",
            "port":3717,
            "class":"com.bytedance.dts.batch.mongodb.MongoDBInputFormat"
        }
    },
    // writer config
    "writer": {
        // [required] datasource type
        "type": "bytehouse_cdw",
        // 此处有数据源ID,下面连接信息不用填
        "datasource_id": xxxx,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            "ch_db":"db_xxx",
            "ch_table":"tab_xxx",
            "ch_partition_type":"String",
            "partition":"id=20240417",
            "shard_num":1,
            "shard_column":"_id",
            "codec":"none",
            "columns":[
                {
                    "upperCaseName":"_ID",
                    "name":"_id",
                    "type":"String"
                },
                {
                    "upperCaseName":"ADDRESS",
                    "name":"address",
                    "type":"String"
                },
                {
                    "upperCaseName":"DISCOUNT",
                    "name":"discount",
                    "type":"UInt8"
                }
            ],
            "class":"com.bytedance.dts.batch.bytehouse.BytehouseCdwOutputFormat"
        }
    },
    // common config
    "common": {
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
        }
    }
}

批式写参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:

参数名

参数说明

*type

数据源类型,对于 Mongo 类型,填写 mongo

*datasource_id

注册的 Mongo 数据源 ID,可以在项目控制台 > 数据源管理界面中查找,如果因网络问题没有配置数据源,您可填写为 null。

*collection_name

填写需要读取数据的 Mongo 集合名称。

filter

支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,需要注意使用 Mongo 过滤语法。例如:{createTime:{$gte:ISODate('2024-02-02T00:00:00.000+0800')}}、{stringField:{$eq:"hello"}}

*split_pk

Mongo 中主键字段的名称信息。

*self_built

判断读取的是否为 Mongo 自建数据库中的数据。true 为非火山引擎 Mongo 数据库,false 为火山引擎 Mongo 数据库

*columns

需要同步的 Mongo 文档列名集合,使用 JSON 的数组描述字段信息。

*user_name

输入有权限访问 Mongo 的用户名信息。

*password

输入对应用户的密码信息。

*hosts_str

填写访问 Mongo 数据库的主机名或 IP 地址信息,可填写多个主机名信息,用英文逗号分隔。

*port

填写 Mongo 主机的端口号。

auth_db_name

填写 Mongo 的鉴权数据库信息,用于身份权限验证。

*db_name

填写 Mongo 集合所在的数据库名称信息。

*class

Mongo Reader connector type, 默认固定值:com.bytedance.dts.batch.mongodb.MongoDBInputFormat

4 高级参数说明

对于可视化通道任务,读参数需要加上 job.reader. 前缀,写参数需要加上 job.writer. 前缀,如下图所示:
图片

4.1 MongoDB 离线读

离线读支持高级参数示例如下,您可根据实际情况进行配置,更多读取的高级参数可详见 4.5.1 MySQL 批式读

参数名

描述

默认值

reader_fetch_size

单批次读取文档 doc 的数量。

100000

filter

指定读取过滤条件,满足 MongoDB 语法,如读取 id = 1000 的数据,填写示例如下:
{id: {$eq: 1000}}

split_mode

分片模式支持两种:

  1. paginating:单个分片的数量为:totalRecords / batchSize
  2. parallelism:若 MongoDB 支持 splitVector 功能,则使用 MongoDB 内置切片功能,否则将根据文档数量/parallelism 平均划分,单个分片的数量为:totalRecords / parallelismpaginating:根据数据量/fetchsize精确分片

说明

若需关闭分片,可设置并发度为 1。

parallelism

4.1 MongoDB 离线写

离线写支持以下高级参数,您可根据实际情况进行配置:

参数名

描述

默认值

max_connection_per_host

连接池最大连接数。

100

connect_timeout_ms

连接超时时间。

10000

batch_size

单批次写入 MongoDB 的数据量。

100