本文将为您介绍 DataSail MongoDB CDC Connector 的高级参数配置,内容涵盖连接认证、数据读取策略及性能优化等。
本章节详细介绍 MongoDB CDC Source 在读取数据时可供配置的高级参数。
本节包含配置与 MongoDB 数据库建立安全、稳定连接所需的基础参数。
job.reader.connectionsmongodb.hosts 格式。host:port 地址。连接器会利用这些“种子节点”自动发现所有副本集成员,并在主节点发生切换时实现自动故障转移。mongos 路由进程的地址。客户端将通过 mongos 与后端的多个分片进行通信。Connection refused 或 Timed out 报错往往源于网络策略或防火墙限制。job.reader.user_name 和 job.reader.password 参数使用。job.reader.connections 是您数据同步任务的“导航地图”,需要在此提供 MongoDB 集群的入口地址。为确保任务在节点故障时也能正常工作,建议至少配置两到三个地址。job.reader.user_namemongodb.user 配置。Authentication failed 相关的错误。local 数据库的读权限(用于访问 Oplog)以及对目标业务数据库和集合的 find 和 changeStream 权限。job.reader.password 成对出现。admin,如果您的用户定义在其他数据库,可能需要通过 job.reader.debezium 透传 mongodb.auth.source 参数来指定。job.reader.user_name 是您访问受保护 MongoDB 数据的“身份凭证”。请确保填写的用户拥有足够的权限来读取您需要同步的数据变更。job.reader.passwordjob.reader.user_name 配套使用的密码。该参数值会被映射到 Debezium 的 mongodb.password 配置。job.reader.user_name 对应的正确密码。job.reader.password 是与用户名配对的“钥匙”,用于完成数据库的身份验证。请确保其安全性和正确性。job.reader.debeziumjob.reader.debezium 设置 mongodb.ssl.enabled: "true" 来启用加密连接。如果需要跳过主机名验证(仅限测试环境),可以设置 mongodb.ssl.invalid.hostname.allowed: "true"。mongodb.socket.timeout.ms, mongodb.connect.timeout.ms, mongodb.server.selection.timeout.ms 等参数,以应对复杂的网络环境。connect.backoff.initial.delay.ms (初始退避延迟), connect.backoff.max.delay.ms (最大退避延迟), connect.max.attempts (最大尝试次数) 等 Debezium 内置的连接重试参数。admin 库,可以通过设置 mongodb.auth.source: "your_auth_db" 来指定。debezium map 透传的参数,其优先级通常高于 BitSail 自动生成的同名参数。这意味着您可以用它来“精确覆盖”框架的默认行为。job.reader.debezium 是一个高级“专家模式”开关,它打开了一扇通往底层 Debezium 连接器原生配置的大门。当您需要进行精细化的 SSL 控制、网络超时调整或配置非标准认证时,可以通过它来实现。本节参数决定了 CDC 任务在首次启动(即没有任何历史状态或 Checkpoint)时,从 MongoDB Oplog 或 Change Streams 的哪个位置开始读取数据。
job.reader.initial_offset_typelatestlatest(默认)。作业将从启动时刻的 Oplog/Change Stream 当前位置开始消费,忽略所有历史数据。这是流式任务最常见的配置。specific,并配合 job.reader.initial_offset_timestamp 和 job.reader.initial_offset_ordinal 两个参数来指定一个精确的 BSON Timestamp。earliest。请特别注意,在当前的 MongoDB CDC 实现中,earliest 并不等同于 从 Oplog 的最开始位置进行全量同步。它的实际行为是获取当前时间点作为起始位点,效果上与 latest 类似。这是一个常见的误解区。latest 即可。specific 模式,并提供精确的 timestamp 和 ordinal 值。timestamp 和 file 模式在 MongoDB CDC 连接器中不被支持,配置后会导致作业启动失败。initial_offset_type 的值决定了 initial_offset_timestamp 和 initial_offset_ordinal 是否生效。initial_offset_* 参数将不再有效。任务重启时会严格从 Checkpoint 中保存的位点恢复,实现精确一次(Exactly-once)语义。job.reader.initial_offset_type 是为您的 CDC 任务设定“起跑线”的关键参数。对于大多数实时同步场景,默认的 latest(从现在开始)是最佳选择。若需从历史上的某一精确时刻开始“回放”数据,请使用 specific 模式。job.reader.initial_offset_timestamp / job.reader.initial_offset_ordinalinitial_offset_type 设置为 specific 时,这两个参数共同组成一个精确的 BSON Timestamp,作为 CDC 的起始位点。timestamp 是指自 Unix 纪元以来的秒数,ordinal 是该秒内的自增序数。T 开始,下游数据出现问题,需要从 T 之前的一个精确位点重新同步。T 之前的一条健康记录,记录其 ts 字段的值(一个 BSON Timestamp),然后将其中的 t (秒) 和 i (序数) 分别填入 initial_offset_timestamp 和 initial_offset_ordinal。mongosh 连接到数据库,查询 local.oplog.rs 集合来找到合适的 BSON Timestamp。例如,查询 db.getSiblingDB('local').oplog.rs.find().sort({$natural: -1}).limit(1) 可以看到最新的 Oplog 记录及其 ts 字段。specific 模式下有效:如果 initial_offset_type 不是 specific,这两个参数将被忽略。initial_offset_type 一样,这两个参数仅在作业首次启动时用于定位,一旦有了 Checkpoint,将以 Checkpoint 中的位点为准。initial_offset_timestamp 和 initial_offset_ordinal 组合起来,就像一个能精确定位到 MongoDB 历史某一瞬间的“时空坐标”。当您需要从一个非常精确的历史点(而非模糊的“最早”或“最新”)开始数据同步时,这对参数是您的不二之选。本节参数控制 Debezium 连接器在启动时如何处理存量数据(快照),以及在后续运行时如何捕获增量变更。
snapshot.modejob.reader.debezium 透传)initialinitial (默认)。作业启动后,会先对所有匹配的集合执行一次全量数据扫描(快照),然后无缝切换到 Change Streams 继续捕获增量变更。这是实现数据表完整迁移和同步的标准模式。never。作业将跳过全量快照阶段,直接从指定的 initial_offset_type 位点开始消费增量变更。这适用于您只关心新产生的数据,或者已经通过其他方式完成了历史数据迁移的场景。when_needed。这是一种更高级的模式,通常与信号表(signaling collections)配合使用,允许您在任务运行过程中通过向信号表插入命令来触发对特定集合的快照。initial 模式来确保数据的完整性。never 模式,可以大大缩短作业启动时间。when_needed,但需要对 Debezium 的信号机制有深入了解。initial_offset_type:当 snapshot.mode=never 时,initial_offset_type 的设置变得尤为重要,它直接决定了增量流的起点。initial.sync.max.threads:在 initial 模式下,此参数决定了全量快照阶段的并行度。snapshot.mode 决定了您的 CDC 任务是“回顾历史”还是“只看未来”。选择 initial 会执行“全量+增量”同步,确保数据完整;选择 never 则只处理启动后的增量变更,适用于存量数据已就绪的场景。capture.modejob.reader.debezium 透传)change_streams_update_fullchange_streams_update_full。在这种模式下,对于 update 操作,Debezium 事件中会包含完整的文档更新后镜像。这对于下游系统直接应用变更是至关重要的。change_streams。在这种模式下,update 事件只包含变更的部分(update description),而不包含完整的文档。这可以减少从 MongoDB 到连接器的网络流量,但下游处理会变得复杂,因为需要基于变更描述来重构记录。oplog。这是通过直接读取 oplog.rs 集合来捕获变更的传统方式。但从 MongoDB 3.6 开始,官方推荐使用 Change Streams。change_streams_update_full,除非您面临极端的网络带宽瓶颈,或者下游系统有特殊处理能力。获取完整的更新后镜像是保证数据一致性和简化下游逻辑的最佳实践。oplog 模式。capture.mode 的选择影响了 Debezium 输出事件中 after 字段的内容,进而影响到 BitSail 侧 CdcMultiplexTableRecord 的 after 镜像。capture.mode 决定了 CDC 任务捕获到的数据变更“有多详细”。保持默认的 change_streams_update_full,您就能拿到每次数据更新后的完整记录,让下游处理简单明了。这就像拍照时选择保存高清原图,而不是只留下一张修改痕迹的草稿。initial.sync.max.threadsjob.reader.debezium 透传)1snapshot.mode=initial 时,指定执行全量快照的最大并发线程数。8 或 16。Debezium 会启动一个线程池,以集合为单位并行地执行快照。job.reader.snapshot_chunk_size 和源端数据库的查询性能。initial.sync.max.threads 就像在搬家时增加了搬运工的数量。当您需要一次性同步大量(数十上百个)集合时,适当调高此参数可以让“搬家”(全量快照)过程更快完成。但如果只搬一个超大件(单个大集合),增加再多工人也无法同时抬,此时该参数无效。本节参数定义了 CDC 数据记录的输出格式,以及是否需要在记录中附加额外的元数据信息。
job.reader.use_multi_table_recordfalsetrue 时,输出为 CdcMultiplexTableRecord(多表复用记录格式);当设置为 false 时,输出为 debezium-json 格式的普通 Row。job.reader.use_multi_table_record 设置为 true。CdcMultiplexTableRecord 是一种结构化对象,内部封装了 tableId(包含库、表信息)、rowKind(INSERT/UPDATE/DELETE)、before(变更前镜像)、after(变更后镜像)以及其他元数据。下游算子可以直接访问这些信息,轻松实现动态分发。job.reader.use_multi_table_record 设置为 false。Row,该列的内容是 Debezium 事件的完整 JSON 字符串,遵循 Debezium Connect 的标准格式。true。这是 BitSail CDC 体系实现多表同步、Schema Evolution 等高级功能的基础。false。job.reader.format_type: 当 use_multi_table_record=true 时,format_type 参数被忽略。当 use_multi_table_record=false 时,format_type 必须为 debezium_json。true 模式要求下游能处理 CdcMultiplexTableRecord;false 模式则要求下游能处理单列的 JSON 字符串 Row。job.reader.use_multi_table_record 是决定数据“包装方式”的关键开关。设置为 true(推荐),数据会被打包成 BitSail 的“标准快递”,内含清晰的来源、类型等标签,便于后续处理;设置为 false,则数据会以原始的“大 JSON包裹”形式输出,适合直接转发给能处理这种包裹的外部系统。job.reader.format_typedebezium_jsonjob.reader.use_multi_table_record=false 的情况下,指定输出的 Row 中那条字符串的格式。job.reader.use_multi_table_record 设置为 false 时,此参数必须为 debezium_json。此时,连接器会使用 JsonLogChangeDataDeserializationSchema 来生成 Debezium 事件的 JSON 字符串。format_type 必须是 debezium_json。任何其他值都可能导致作业失败。job.reader.use_multi_table_record 被设置为 false。job.reader.format_type 在 MongoDB CDC 场景下是一个“只读”参数。当您选择输出原始 JSON 流时,它的值有且仅有 debezium_json 这一个选项。job.reader.src_meta_info_column_enabled / job.reader.src_meta_info_column_namefalse / __src_meta_info__src_meta_info_column_enabled 设置为 true。src_meta_info_column_name 参数为您希望的元数据列指定一个名称,以避免与业务字段冲突。{"db":"my_db","table":"my_collection"}。job.common.cdc_multiplex_record_case_insensitive:当此参数为 true 时,src_meta_info_column_name 的值以及元信息 JSON 中的 key(db/table)可能会被转换为小写。src_meta_info_column_enabled 设为 true,就会自动多出一列包含这些信息的元数据。本节参数帮助您在数据吞吐量、处理延迟和资源消耗之间找到平衡。
job.reader.poll_interval_ms500ChangeEventQueue 在内部队列为空时,进行下一次轮询前的等待时间(单位:毫秒)。1000 或 2000。这可以减少连接器在没有数据时对内部队列的空轮询,降低不必要的 CPU 消耗。100 或 200。这样可以更快地发现并处理新到达的事件,但会增加 CPU 在空闲时的开销。500 在大多数场景下是一个合理的折中。job.reader.poll_interval_ms 控制了 CDC 任务在“空闲时”的打盹频率。调大此值让任务在没活干的时候多睡一会儿,节省体力(CPU);调小则让它更警觉,能更快地处理新任务,但空闲时也更耗神。job.reader.max_batch_size2048ChangeEventQueue 每次从内部队列中批量取出的最大记录数。4096 或 8192。这可以有效提升数据处理的吞吐量。1024。减小批次大小可以降低单次处理对内存的峰值需求。batch_size:为了达到最佳性能,Source 的 max_batch_size 最好与下游 Sink(如 Hudi, Iceberg)的写入批次大小相协调。job.reader.max_batch_size 决定了 CDC 任务每次“打包”多少条数据进行处理。调大此值就像用更大的箱子打包,单次效率高,总吞吐量大;调小则像用小号快递袋,灵活轻便,适合内存有限或对单条数据延迟敏感的场景。job.reader.snapshot_chunk_size1024snapshot.mode=initial)阶段,每次从 MongoDB 数据库 find() 拉取数据的批次大小。256 或 512。这可以减少单次拉取的数据量,降低对内存和网络的瞬时压力。2048 或 4096,以减少与数据库的交互次数,可能提升快照总时长。1024 是一个相对安全的选择。如果遇到问题,首先尝试调小它。job.reader.snapshot_chunk_size 控制了在“全量复制”阶段,每次从 MongoDB 数据库搬运多少条记录。如果记录本身很大(比如包含大段文本或二进制内容),调小此值就像分多次、小批量搬运,虽然次数多了,但更稳妥,不易因单次负载过重而失败。本节参数用于增强 CDC 任务在面对网络抖动、数据异常等问题时的自我恢复能力。
job.reader.cdc_retry_times3MongoSocketReadException, MongoTimeoutException 等。5 或 10,给任务更多的机会在无需人工干预的情况下自动恢复。5,以增强任务的健壮性。-1 表示无限次重试,直到成功。请谨慎使用此配置,因为它可能导致任务在遇到持久性问题时一直卡住,消耗资源且无法告警。connect.max.attempts)是两个不同层面的机制。Debezium 重试主要处理初始连接和连接中断后的恢复,而 cdc_retry_times 则是在 Debezium 成功建立连接并开始拉数据后,对拉取批次时发生的异常进行上层包裹重试。job.reader.cdc_retry_times 为您的 CDC 任务提供了“再试一次”的勇气。当遇到网络抖动等暂时性问题时,它会尝试重新拉取数据,避免任务因小波折而中断。适当调高此值可以使您的同步任务在不稳定的环境中表现得更加顽强。job.reader.skip_error_recordfalsefalse。一旦遇到无法解析的“脏数据”,任务会立即失败并报错,强制您去关注和解决源头的数据质量问题。这是保证数据一致性的最安全做法。true。此时,当遇到解析失败的记录时,系统会打印一条错误日志,丢弃该记录,然后继续处理后续数据。false,并通过告警和日志来监控数据质量问题。true,以便任务能够跑通并处理大部分有效数据。skip_error_record 意味着您接受了数据可能会丢失的风险。请务必确保有相应的监控措施来统计被跳过的记录数量,并定期排查原因。job.reader.skip_error_record 是处理“坏苹果”的策略开关。保持默认 false,遇到一个坏苹果(脏数据),整箱水果(任务)就停止检查,确保万无一失;设置为 true,则是把坏苹果挑出来扔掉,继续处理剩下的好苹果,保证流程不断,但可能会丢失少量数据。对于一个典型的 MongoDB 到数据仓库/数据湖的实时同步任务,我们推荐以下核心参数组合:
{ "job": { "reader": { "connections": [ {"master": {"hosts": ["mongo1:27017", "mongo2:27017"]}} ], "user_name": "your_user", "password": "your_password", "use_multi_table_record": true, "job.reader.debezium": { "snapshot.mode": "initial", "capture.mode": "change_streams_update_full" } } } }
connections: 提供至少2个副本集成员,保证高可用。use_multi_table_record: true: 启用结构化输出,是 BitSail 高级功能的基础。snapshot.mode: initial: 保证全量+增量,数据完整。capture.mode: change_streams_update_full: 获取完整的更新后文档,简化下游逻辑。Connection refused, Timed out, Authentication failed, MongoServerSelectionExceptionjob.reader.connections:确认地址和端口是否正确,作业环境是否能访问到这些地址。job.reader.user_name 和 job.reader.password:确认用户名密码是否正确,以及该用户是否有足够权限(特别是访问 local.oplog.rs 和目标集合的 changeStream 权限)。admin 库,请通过 job.reader.debezium 透传 mongodb.auth.source。mongodb.ssl.enabled: "true"。snapshot.mode:如果是 initial 模式,且源端数据量巨大,作业可能正处于漫长的全量快照阶段。观察日志中是否有快照相关的进度信息。initial_offset_type:如果是 latest,请确认在作业启动后源端是否有新的数据变更。database_include_list 和 collection_include_list 等参数是否正确配置,没有意外地将目标库表过滤掉。Deserialization error, JsonParseException, Unrecognized fieldjob.reader.skip_error_record 设置为 true,但这会导致数据丢失。transform 算子中增加对异常数据的清洗和转换逻辑。