You need to enable JavaScript to run this app.
大数据研发治理套件

大数据研发治理套件

复制全文
数据开发
Shell 调用 DataX 最佳实践
复制全文
Shell 调用 DataX 最佳实践

DataX 是开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。DataX 作为数据同步框架,它将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,使用 DataX 框架可以支持多种数据源类型的数据互通同步工作。
详见:https://github.com/alibaba/DataX
本文将为您介绍在火山引擎大数据研发治理套件 DataLeap 上,通过 Shell 任务调用 DataX 的方式,将火山引擎云数据库 MySQL 与 文档数据库 MongDB 进行数据互通。

1 使用前提

  1. 已开通 DataLeap 服务。
  2. 如子账号登录,需具备服务使用权限,如 DataLeapFullAccess 权限。
  3. Shell 任务访问私有网络服务或资源时,需通过独享计算资源组访问,Shell 任务界面不支持单独修改网络配置。独享计算资源组操作详见独享计算资源组管理
  4. 已开通并创建火山引擎 云数据库 MySQL文档数据库 MongDB 的实例。

2 数据准备

2.1 MySQL数据准备:

--创建表
create table mysql_mongodb
(
    id          int unsigned auto_increment,
    name        varchar(400) not null,
    address     varchar(400) not null,
    create_time bigint       not null,
    event_time  bigint       not null,
    price       double       not null,
    date_info   date         not null
    PRIMARY KEY ( `Id` )
);

--插入以下测试数据
125037,张三,天津市丽县,1669862281,1668155516,-36010.5893188364,2022-12-11
125042,李四,河北省斌县,1701360001,1668155793,-35754.8812765556,2022-12-11
125044,王五,宁夏回族自治区慧县,1666972800,1668155829,-98763.5966260757,2022-12-11
125046,赵六,河南省淑兰县,1669824001,1668155873,-42967.791140771,2022-12-11
125047,小兰,重庆市秀英县,1668156174,1668155874,-12472.4318984986,2022-12-11
125073,小梅,西藏自治区玉华县,1669820401,1668157424,79521.9035130631,2022-12-11
153995,小琴,江苏省马鞍山县,1669804707,1670157326,-77601.3264138525,2022-12-11
153996,雅芝,辽宁省佛山市,1669727579,1670157380,-69563.8610820313,2022-12-11
154008,小莉,台湾省哈尔滨市,1675849149,1675848849,-4627.3688887951,2022-12-11
154011,小军,福建省淑兰县,1675854578,1675854278,12256.9309321271,2023-02-08
154012,小刚,宁夏回族自治区乌鲁木齐县,1675854784,1675854484,-60220.5831664783,2023-02-08

2.2 MongoDB 集合创建

在 MongoDB 目标数据库中,执行以下命令,创建集合名称:

db.createCollection("mysql2mongo")

3 配置任务

  1. 登录 DataLeap租户控制台
  2. 概览界面,显示加入的项目中,单击数据开发进入对应项目。
  3. 任务开发界面,左侧导航栏中,单击新建任务按钮,完成新建任务配置。详见 Shell 任务

3.1 配置脚本

任务完成新建后,进入 Shell 任务编辑界面,进行以下脚本编辑:
Shell 脚本编辑

echo  '{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",  
                    "parameter": {
                        "username": "username", //数据库登录账号信息
                        "password": "password", //数据库登录密码信息
                        "column": [ // 表中需要同步的字段名称集合,使用 JSON 的数组描述字段信息。您可使用*代表默认使用所有列配置,例如['*']
                                  "id",
                                  "name",
                                  "address",
                                  "create_time",
                                  "event_time",
                                  "price",
                                  "date_info"
                                  ],
                        "splitPk": "id", //切分建。根据split_pk指定的字段进行数据分片,同步时启动并发任务进行数据同步。推荐使用表主键切分。
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://ip:port/databases" // 数据库的JDBC私网地址连接信息,可用数据库IP或实例名称方式填写连接信息。
                                ], 
                                "table": ["mysql_mongodb"] //需同步的数据表名称
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "mongodbwriter", //Writer 端类型
                    "parameter": {
                        "address": [
                           "ip:port" //MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出
                           ],
                           "userName": "userName", // MongoDB的用户名
                           "userPassword": "userPassword", //MongoDB对应用户的密码
                           "dbName": "dbName", //MonogoDB的库名信息
                           "collectionName": "mysql2mongo", //MonogoDB的集合名
                        "column": [ //MongoDB的文档列名,以名称和对应的数据类型方式填写。
                      {
                          "name": "id",
                          "type": "int"
                      },
                      {
                          "name": "name",
                          "type": "string"
                      },
                      {
                          "name": "address",
                          "type": "string"
                      },
                      {
                          "name": "create_time",
                          "type": "Long"
                      },
                      {
                          "name": "event_time",
                          "type": "Long"
                      },
                      {
                          "name": "price",
                          "type": "double"
                      },
                      {
                          "name": "date_info",
                          "type": "date"
                      }
                                        ],  
                        "writeMode": {
                              "isReplace": "true", //当设置为true时,表示针对相同的replaceKey做更新操作
                              "replaceKey": "id" //replaceKey指定了每行记录的业务主键。用来做更新时使用
                         }
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1" //同步任务并发数设置
            }
        }
    }
}' > mysql2mongodb.json
python3 ./datax/bin/datax.py mysql2mongodb.json //启动 DataX

说明

脚本配置时,需将以上示例脚本中的注释说明删除后再实际执行。

更多 Reader 和 Writer 插件,详见开源 DataX

3.2 执行设置

脚本配置完成后,您可进行以下操作,完成任务执行资源配置:

  1. 单击进入右侧侧边栏执行设置窗口。
  2. 选择计算资源组:下拉选择独享计算资源组
  3. 镜像地址:
    • DataX 工具已集成在 DataLeap 公共镜像地址:dataleap-cn-beijing.cr.volces.com/studio/datax:v1 中,Shell 任务直接配置使用即可。
  4. 资源配置:资源可根据实际需求进行配置,以 CU 为单位,默认配置 1CU(1CU = 1Core 4GB),下拉可选择更高规格的资源配置。

3.3 网络设置

在选取独享计算资源组设置后,网络配置中会默认将独享计算资源组绑定的私有网络、子网、安全组信息填入,且不可修改,您可在创建独享计算资源组时,配置好对应的私有网络信息,详见:资源组管理

3.4 任务产出设置

产出数据登记用于记录任务的数据血缘,不会对代码逻辑造成影响,此示例选择默认。

  1. 引擎类型:E-MapReduce(EMR)和湖仓一体分析服务(LAS)、默认(没有产出登记)。
  2. 关联实例:选择关联对应引擎下的实例信息。
  3. 数据类型:仅 EMR 引擎类型下需选择数据类型,目前有 Hive、HDFS、其他。
  4. 数据库表:选择对应引擎下产出的数据库和数据表。

4 保存运行任务

任务配置完成后,依次单击上方操作栏中保存调试图标按钮,执行编辑好的 Shell 命令,执行成功后,可在界面下方查看运行日志和同步结果。
Image

5 数据验证

使用 DataGrip 本地数据库工具,登录 MongoDB 的数据库,执行以下查询命令:

db.getCollection("mysql2mongo")

执行结果如下:
Image

6 提交任务

数据验证确认无误后,您可进行后续的调度设置和将任务提交发布到运维中心离线任务运维中执行。

  1. 调度设置:
    在右侧导航栏中,进入调度设置界面,您可以在此设置调度资源组、调度属性、依赖关系等信息,详细参数设置详见调度设置
  2. 提交发布:
    单击操作栏中的保存提交上线按钮,在弹窗中,需先通过提交上线流程,最后单击确认按钮,完成作业提交。详见5.1 离线任务提交

后续任务运维操作详见:离线任务运维

最近更新时间:2025.07.04 16:58:48
这个页面对您有帮助吗?
有用
有用
无用
无用