You need to enable JavaScript to run this app.
文档中心
大数据研发治理套件

大数据研发治理套件

复制全文
下载 pdf
高级参数
配置 MongoDB CDC 高级参数
复制全文
下载 pdf
配置 MongoDB CDC 高级参数

本文将为您介绍 DataSail MongoDB CDC Connector 的高级参数配置,内容涵盖连接认证、数据读取策略及性能优化等。

Source 参数(读取侧)

本章节详细介绍 MongoDB CDC Source 在读取数据时可供配置的高级参数。

连接与认证

本节包含配置与 MongoDB 数据库建立安全、稳定连接所需的基础参数。

job.reader.connections

  • 参数概述
    • 参数名job.reader.connections
    • 适用范围:Source
    • 核心作用:指定 MongoDB 副本集或分片集群的连接地址列表。这是建立连接的最核心配置。内部实现会将这些信息转换为 Debezium MongoDB Connector 所需的 mongodb.hosts 格式。
  • 适用场景
    • 场景一:连接副本集
      • 处理方式:在列表中提供至少一个(推荐 2-3 个)副本集成员的 host:port 地址。连接器会利用这些“种子节点”自动发现所有副本集成员,并在主节点发生切换时实现自动故障转移。
    • 场景二:连接分片集群
      • 处理方式:提供一个或多个 mongos 路由进程的地址。客户端将通过 mongos 与后端的多个分片进行通信。
  • 调参建议
    • 高可用性推荐:为避免单点故障,强烈建议提供至少 2-3 个 种子节点地址。无需列出所有成员,连接器会自动发现。
    • 网络检查:确保作业运行环境到您提供的所有地址之间的网络是通畅的。常见的 Connection refusedTimed out 报错往往源于网络策略或防火墙限制。
  • 注意事项
    • 提供的地址仅用于初始连接(Bootstrap),客户端会动态发现和使用集群中的所有可用节点。
    • 对于认证,请配合 job.reader.user_namejob.reader.password 参数使用。
  • 总结说明job.reader.connections 是您数据同步任务的“导航地图”,需要在此提供 MongoDB 集群的入口地址。为确保任务在节点故障时也能正常工作,建议至少配置两到三个地址。

job.reader.user_name

  • 参数概述
    • 参数名job.reader.user_name
    • 适用范围:Source
    • 核心作用:指定用于连接和验证的 MongoDB 用户名。该参数值会被映射到 Debezium 的 mongodb.user 配置。
  • 适用场景
    • 场景一:连接启用了认证的 MongoDB 集群
      • 典型现象:作业启动失败,日志中出现 Authentication failed 相关的错误。
      • 处理方式:填写具有读取 Oplog 或 Change Streams 权限的用户名。
  • 调参建议
    • 权限最小化原则:请为该用户授予必需的最小权限集,通常包括对 local 数据库的读权限(用于访问 Oplog)以及对目标业务数据库和集合的 findchangeStream 权限。
  • 参数联动
    • 必须与 job.reader.password 成对出现。
    • 认证数据库通常默认为 admin,如果您的用户定义在其他数据库,可能需要通过 job.reader.debezium 透传 mongodb.auth.source 参数来指定。
  • 总结说明job.reader.user_name 是您访问受保护 MongoDB 数据的“身份凭证”。请确保填写的用户拥有足够的权限来读取您需要同步的数据变更。

job.reader.password

  • 参数概述
    • 参数名job.reader.password
    • 适用范围:Source
    • 核心作用:与 job.reader.user_name 配套使用的密码。该参数值会被映射到 Debezium 的 mongodb.password 配置。
  • 适用场景
    • 场景一:连接启用了认证的 MongoDB 集群
      • 处理方式:填写与 job.reader.user_name 对应的正确密码。
  • 调参建议
    • 安全存储:建议将密码等敏感信息存储在安全的配置中心或加密文件中,而非直接硬编码在作业配置里。
  • 总结说明job.reader.password 是与用户名配对的“钥匙”,用于完成数据库的身份验证。请确保其安全性和正确性。

job.reader.debezium

  • 参数概述
    • 参数名job.reader.debezium
    • 适用范围:Source
    • 核心作用:这是一个“万能”的透传参数配置,允许您将任何 Debezium MongoDB Connector 支持的原生参数直接注入到连接器中。它是一个 Key-Value 结构的 Map。
  • 适用场景
    • 场景一:配置 BitSail 未直接暴露的 SSL/TLS 参数
      • 处理方式:通过 job.reader.debezium 设置 mongodb.ssl.enabled: "true" 来启用加密连接。如果需要跳过主机名验证(仅限测试环境),可以设置 mongodb.ssl.invalid.hostname.allowed: "true"
    • 场景二:精细化控制网络超时
      • 处理方式:设置 mongodb.socket.timeout.ms, mongodb.connect.timeout.ms, mongodb.server.selection.timeout.ms 等参数,以应对复杂的网络环境。
    • 场景三:调整底层的重试逻辑
      • 处理方式:配置 connect.backoff.initial.delay.ms (初始退避延迟), connect.backoff.max.delay.ms (最大退避延迟), connect.max.attempts (最大尝试次数) 等 Debezium 内置的连接重试参数。
    • 场景四:指定认证数据库
      • 处理方式:如果认证用户不在 admin 库,可以通过设置 mongodb.auth.source: "your_auth_db" 来指定。
  • 调参建议
    • 按需使用:当 BitSail 提供的标准参数无法满足您特定的连接、安全或性能调优需求时,才应求助于此参数。
    • 参考官方文档:在设置任何透传参数前,强烈建议查阅 Debezium 官方文档中关于 MongoDB Connector 的参数说明,以确保参数名和值的正确性。
  • 注意事项
    • 通过 debezium map 透传的参数,其优先级通常高于 BitSail 自动生成的同名参数。这意味着您可以用它来“精确覆盖”框架的默认行为。
    • 错误的参数名或值可能会导致作业启动失败或行为异常,请谨慎使用。
  • 总结说明job.reader.debezium 是一个高级“专家模式”开关,它打开了一扇通往底层 Debezium 连接器原生配置的大门。当您需要进行精细化的 SSL 控制、网络超时调整或配置非标准认证时,可以通过它来实现。

起始位点控制

本节参数决定了 CDC 任务在首次启动(即没有任何历史状态或 Checkpoint)时,从 MongoDB Oplog 或 Change Streams 的哪个位置开始读取数据。

job.reader.initial_offset_type

  • 参数概述
    • 参数名job.reader.initial_offset_type
    • 类型:枚举字符串
    • 默认值latest
    • 核心作用:定义了在没有可用 Flink Checkpoint 状态时,CDC 源的初始读取策略。
  • 适用场景
    • 场景一:只关心未来的增量数据
      • 处理方式:设置为 latest(默认)。作业将从启动时刻的 Oplog/Change Stream 当前位置开始消费,忽略所有历史数据。这是流式任务最常见的配置。
    • 场景二:需要从某个精确的历史时间点开始回溯
      • 处理方式:设置为 specific,并配合 job.reader.initial_offset_timestampjob.reader.initial_offset_ordinal 两个参数来指定一个精确的 BSON Timestamp。
    • 场景三:希望从“尽可能早”的位置开始(注意:行为有特殊性)
      • 处理方式:设置为 earliest请特别注意,在当前的 MongoDB CDC 实现中,earliest 并不等同于 从 Oplog 的最开始位置进行全量同步。它的实际行为是获取当前时间点作为起始位点,效果上与 latest 类似。这是一个常见的误解区。
  • 调参建议
    • 流式任务:保持默认值 latest 即可。
    • 数据回溯或调试:使用 specific 模式,并提供精确的 timestampordinal 值。
    • 不支持的模式timestampfile 模式在 MongoDB CDC 连接器中不被支持,配置后会导致作业启动失败。
  • 参数联动
    • initial_offset_type 的值决定了 initial_offset_timestampinitial_offset_ordinal 是否生效。
    • 重要:一旦作业成功创建了 Checkpoint,所有 initial_offset_* 参数将不再有效。任务重启时会严格从 Checkpoint 中保存的位点恢复,实现精确一次(Exactly-once)语义。
  • 总结说明job.reader.initial_offset_type 是为您的 CDC 任务设定“起跑线”的关键参数。对于大多数实时同步场景,默认的 latest(从现在开始)是最佳选择。若需从历史上的某一精确时刻开始“回放”数据,请使用 specific 模式。

job.reader.initial_offset_timestamp & job.reader.initial_offset_ordinal

  • 参数概述
    • 参数名job.reader.initial_offset_timestamp / job.reader.initial_offset_ordinal
    • 类型:长整型 (Long) / 整型 (Integer)
    • 核心作用:当 initial_offset_type 设置为 specific 时,这两个参数共同组成一个精确的 BSON Timestamp,作为 CDC 的起始位点。timestamp 是指自 Unix 纪元以来的秒数,ordinal 是该秒内的自增序数。
  • 适用场景
    • 场景一:从一次故障或数据错误中恢复
      • 典型现象:您发现从某个时间点 T 开始,下游数据出现问题,需要从 T 之前的一个精确位点重新同步。
      • 处理方式:从 MongoDB 的 Oplog 中找到时间点 T 之前的一条健康记录,记录其 ts 字段的值(一个 BSON Timestamp),然后将其中的 t (秒) 和 i (序数) 分别填入 initial_offset_timestampinitial_offset_ordinal
  • 调参建议
    • 获取方式:您可以通过 mongosh 连接到数据库,查询 local.oplog.rs 集合来找到合适的 BSON Timestamp。例如,查询 db.getSiblingDB('local').oplog.rs.find().sort({$natural: -1}).limit(1) 可以看到最新的 Oplog 记录及其 ts 字段。
    • 仅在 specific 模式下有效:如果 initial_offset_type 不是 specific,这两个参数将被忽略。
  • 注意事项
    • Oplog 窗口:您指定的时间点必须仍在 Oplog 的有效窗口期内。如果该位点已经被 Oplog 轮转覆盖掉了,作业启动时会因找不到起始位点而失败。
    • 仅首次生效:与 initial_offset_type 一样,这两个参数仅在作业首次启动时用于定位,一旦有了 Checkpoint,将以 Checkpoint 中的位点为准。
  • 总结说明initial_offset_timestampinitial_offset_ordinal 组合起来,就像一个能精确定位到 MongoDB 历史某一瞬间的“时空坐标”。当您需要从一个非常精确的历史点(而非模糊的“最早”或“最新”)开始数据同步时,这对参数是您的不二之选。

快照与捕获模式

本节参数控制 Debezium 连接器在启动时如何处理存量数据(快照),以及在后续运行时如何捕获增量变更。

snapshot.mode

  • 参数概述
    • 参数名snapshot.mode
    • 适用范围:Source (通过 job.reader.debezium 透传)
    • 默认值initial
    • 核心作用:定义了连接器在启动时的快照(snapshot)行为,决定是否以及如何读取集合中的现有数据。
  • 适用场景
    • 场景一:全量 + 增量同步(最常用)
      • 处理方式:设置为 initial (默认)。作业启动后,会先对所有匹配的集合执行一次全量数据扫描(快照),然后无缝切换到 Change Streams 继续捕获增量变更。这是实现数据表完整迁移和同步的标准模式。
    • 场景二:仅增量同步
      • 处理方式:设置为 never。作业将跳过全量快照阶段,直接从指定的 initial_offset_type 位点开始消费增量变更。这适用于您只关心新产生的数据,或者已经通过其他方式完成了历史数据迁移的场景。
    • 场景三:按需快照
      • 处理方式:设置为 when_needed。这是一种更高级的模式,通常与信号表(signaling collections)配合使用,允许您在任务运行过程中通过向信号表插入命令来触发对特定集合的快照。
  • 调参建议
    • 首次同步:使用默认的 initial 模式来确保数据的完整性。
    • 已有存量数据,仅需追增量:使用 never 模式,可以大大缩短作业启动时间。
    • 动态、复杂的运维场景:考虑使用 when_needed,但需要对 Debezium 的信号机制有深入了解。
  • 参数联动
    • initial_offset_type:当 snapshot.mode=never 时,initial_offset_type 的设置变得尤为重要,它直接决定了增量流的起点。
    • initial.sync.max.threads:在 initial 模式下,此参数决定了全量快照阶段的并行度。
  • 总结说明snapshot.mode 决定了您的 CDC 任务是“回顾历史”还是“只看未来”。选择 initial 会执行“全量+增量”同步,确保数据完整;选择 never 则只处理启动后的增量变更,适用于存量数据已就绪的场景。

capture.mode

  • 参数概述
    • 参数名capture.mode
    • 适用范围:Source (通过 job.reader.debezium 透传)
    • 默认值change_streams_update_full
    • 核心作用:定义了 Debezium 如何从 MongoDB 获取变更事件,以及事件内容的详细程度。
  • 适用场景
    • 场景一:需要获取更新操作的完整记录(推荐)
      • 处理方式:保持默认值 change_streams_update_full。在这种模式下,对于 update 操作,Debezium 事件中会包含完整的文档更新后镜像。这对于下游系统直接应用变更是至关重要的。
    • 场景二:仅需获取变更描述(节省网络带宽)
      • 处理方式:设置为 change_streams。在这种模式下,update 事件只包含变更的部分(update description),而不包含完整的文档。这可以减少从 MongoDB 到连接器的网络流量,但下游处理会变得复杂,因为需要基于变更描述来重构记录。
    • 场景三:兼容旧版 MongoDB (3.6 之前)
      • 处理方式:设置为 oplog。这是通过直接读取 oplog.rs 集合来捕获变更的传统方式。但从 MongoDB 3.6 开始,官方推荐使用 Change Streams。
  • 调参建议
    • 强烈建议保持默认值 change_streams_update_full,除非您面临极端的网络带宽瓶颈,或者下游系统有特殊处理能力。获取完整的更新后镜像是保证数据一致性和简化下游逻辑的最佳实践。
    • 仅在连接非常老旧的 MongoDB 版本时才考虑 oplog 模式。
  • 参数联动
    • capture.mode 的选择影响了 Debezium 输出事件中 after 字段的内容,进而影响到 BitSail 侧 CdcMultiplexTableRecordafter 镜像。
  • 总结说明capture.mode 决定了 CDC 任务捕获到的数据变更“有多详细”。保持默认的 change_streams_update_full,您就能拿到每次数据更新后的完整记录,让下游处理简单明了。这就像拍照时选择保存高清原图,而不是只留下一张修改痕迹的草稿。

initial.sync.max.threads

  • 参数概述
    • 参数名initial.sync.max.threads
    • 适用范围:Source (通过 job.reader.debezium 透传)
    • 默认值1
    • 核心作用:在 snapshot.mode=initial 时,指定执行全量快照的最大并发线程数。
  • 适用场景
    • 场景一:加速大批量集合的全量同步
      • 典型现象:您需要同步的数据库中包含成百上千个集合,使用单线程快照非常缓慢。
      • 处理方式:适当调大此参数,例如设置为 816。Debezium 会启动一个线程池,以集合为单位并行地执行快照。
  • 调参建议
    • 并行粒度:此参数的并行是以集合(Collection)​为单位的。如果您的任务只同步一个或少数几个大集合,调高此参数不会带来性能提升。它主要适用于多集合的场景。
    • 资源权衡:增加线程数会显著增加对源端 MongoDB 的读取压力和客户端的 CPU 消耗。请根据 MongoDB 服务器的承载能力和同步任务的资源配给来合理设置。
  • 注意事项
    • 这不是银弹。对于单个巨大的集合,此参数无法将其拆分并行处理。单个大集合的快照性能受限于 job.reader.snapshot_chunk_size 和源端数据库的查询性能。
    • 该参数仅在全量快照阶段生效,对增量捕获阶段没有影响。
  • 总结说明initial.sync.max.threads 就像在搬家时增加了搬运工的数量。当您需要一次性同步大量(数十上百个)集合时,适当调高此参数可以让“搬家”(全量快照)过程更快完成。但如果只搬一个超大件(单个大集合),增加再多工人也无法同时抬,此时该参数无效。

数据格式与元数据

本节参数定义了 CDC 数据记录的输出格式,以及是否需要在记录中附加额外的元数据信息。

job.reader.use_multi_table_record

  • 参数概述
    • 参数名job.reader.use_multi_table_record
    • 类型:布尔型 (Boolean)
    • 默认值false
    • 核心作用:控制 CDC Source 输出的数据记录格式。当设置为 true 时,输出为 CdcMultiplexTableRecord(多表复用记录格式);当设置为 false 时,输出为 debezium-json 格式的普通 Row
  • 适用场景
    • 场景一:需要区分数据来源表,并进行结构化处理(强烈推荐)
      • 典型场景:一个 CDC 作业同时监听了多个集合,下游需要根据数据的来源(例如,来自哪个库、哪个集合)来进行不同的处理,如写入到不同的目标表或进行动态的 schema 演进。
      • 处理方式:必须将 job.reader.use_multi_table_record 设置为 true
      • 数据格式:此时输出的 CdcMultiplexTableRecord 是一种结构化对象,内部封装了 tableId(包含库、表信息)、rowKind(INSERT/UPDATE/DELETE)、before(变更前镜像)、after(变更后镜像)以及其他元数据。下游算子可以直接访问这些信息,轻松实现动态分发。
    • 场景二:作为简单的 JSON 日志流处理
      • 典型场景:您希望将 MongoDB 的变更事件作为一份份完整的 JSON 日志,直接投递到消息队列(如 Kafka)供其他非 BitSail 系统消费,不关心其内部结构。
      • 处理方式:将 job.reader.use_multi_table_record 设置为 false
      • 数据格式:此时输出的是一个包含单列的 Row,该列的内容是 Debezium 事件的完整 JSON 字符串,遵循 Debezium Connect 的标准格式。
  • 调参建议
    • BitSail 内部链路:对于大多数在 BitSail 体系内进行端到端数据同步的场景,强烈建议设置为 true。这是 BitSail CDC 体系实现多表同步、Schema Evolution 等高级功能的基础。
    • 对接外部系统:当且仅当您的目标是直接输出 Debezium 的原始 JSON 格式给外部系统消费时,才应设置为 false
  • 参数联动
    • job.reader.format_type: 当 use_multi_table_record=true 时,format_type 参数被忽略。当 use_multi_table_record=false 时,format_type 必须为 debezium_json
    • 下游算子 (Transforms/Sinks): true 模式要求下游能处理 CdcMultiplexTableRecordfalse 模式则要求下游能处理单列的 JSON 字符串 Row
  • 总结说明job.reader.use_multi_table_record 是决定数据“包装方式”的关键开关。设置为 true(推荐),数据会被打包成 BitSail 的“标准快递”,内含清晰的来源、类型等标签,便于后续处理;设置为 false,则数据会以原始的“大 JSON包裹”形式输出,适合直接转发给能处理这种包裹的外部系统。

job.reader.format_type

  • 参数概述
    • 参数名job.reader.format_type
    • 类型:字符串
    • 默认值debezium_json
    • 核心作用:在 job.reader.use_multi_table_record=false 的情况下,指定输出的 Row 中那条字符串的格式。
  • 适用场景
    • 唯一受支持的场景:当 job.reader.use_multi_table_record 设置为 false 时,此参数必须为 debezium_json。此时,连接器会使用 JsonLogChangeDataDeserializationSchema 来生成 Debezium 事件的 JSON 字符串。
  • 调参建议
    • 保持默认:在 MongoDB CDC 中,此参数几乎没有调整空间。如果您选择不使用多表复用记录,那么 format_type 必须是 debezium_json。任何其他值都可能导致作业失败。
  • 参数联动
    • 此参数的生效完全依赖于 job.reader.use_multi_table_record 被设置为 false
  • 总结说明job.reader.format_type 在 MongoDB CDC 场景下是一个“只读”参数。当您选择输出原始 JSON 流时,它的值有且仅有 debezium_json 这一个选项。

job.reader.src_meta_info_column_enabled & job.reader.src_meta_info_column_name

  • 参数概述
    • 参数名job.reader.src_meta_info_column_enabled / job.reader.src_meta_info_column_name
    • 类型:布尔型 (Boolean) / 字符串
    • 默认值false / __src_meta_info__
    • 核心作用:控制是否在输出的数据记录中,额外附加一列包含源端元信息的 JSON 字符串。
  • 适用场景
    • 场景一:下游需要追溯数据来源
      • 典型现象:在多表混合的流中,下游的某个处理环节需要知道每条记录最原始的库名和表名,用于日志记录、监控或动态路由。
      • 处理方式:将 src_meta_info_column_enabled 设置为 true
    • 场景二:自定义元数据列名
      • 处理方式:通过 src_meta_info_column_name 参数为您希望的元数据列指定一个名称,以避免与业务字段冲突。
  • 调参建议
    • 按需开启:仅在下游确实需要这些原始元信息时才开启,以避免不必要的数据冗余。
    • 数据格式:附加列的内容是一个 JSON 字符串,例如:{"db":"my_db","table":"my_collection"}
  • 参数联动
    • job.common.cdc_multiplex_record_case_insensitive:当此参数为 true 时,src_meta_info_column_name 的值以及元信息 JSON 中的 key(db/table)可能会被转换为小写。
  • 总结说明这对参数允许您为每一条数据记录贴上一张“来源标签”。当您在下游处理数据时需要知道它“从哪里来”(具体哪个库、哪个集合),只需将 src_meta_info_column_enabled 设为 true,就会自动多出一列包含这些信息的元数据。

性能与资源

本节参数帮助您在数据吞吐量、处理延迟和资源消耗之间找到平衡。

job.reader.poll_interval_ms

  • 参数概述
    • 参数名job.reader.poll_interval_ms
    • 类型:长整型 (Long)
    • 默认值500
    • 核心作用:定义了 Debezium 的 ChangeEventQueue 在内部队列为空时,进行下一次轮询前的等待时间(单位:毫秒)。
  • 适用场景
    • 场景一:源端数据流量较低
      • 典型现象:MongoDB 的变更非常稀疏,大部分时间内没有新数据产生。
      • 处理方式:可以适当调大此参数,例如 10002000。这可以减少连接器在没有数据时对内部队列的空轮询,降低不必要的 CPU 消耗。
    • 场景二:对数据延迟极度敏感
      • 处理方式:可以适当调小此参数,例如 100200。这样可以更快地发现并处理新到达的事件,但会增加 CPU 在空闲时的开销。
  • 调参建议
    • 通用推荐:保持默认值 500 在大多数场景下是一个合理的折中。
    • CPU 敏感型环境:如果您的作业部署在资源紧张或 CPU 成本敏感的环境,且源端流量不高,建议调大此值。
  • 注意事项
    • 这不是数据从 MongoDB 到达连接器的延迟,而是连接器内部事件队列的处理延迟。对于外部感知的端到端延迟,此参数影响相对较小。
  • 总结说明job.reader.poll_interval_ms 控制了 CDC 任务在“空闲时”的打盹频率。调大此值让任务在没活干的时候多睡一会儿,节省体力(CPU);调小则让它更警觉,能更快地处理新任务,但空闲时也更耗神。

job.reader.max_batch_size

  • 参数概述
    • 参数名job.reader.max_batch_size
    • 类型:整型 (Integer)
    • 默认值2048
    • 核心作用:定义了 Debezium ChangeEventQueue 每次从内部队列中批量取出的最大记录数。
  • 适用场景
    • 场景一:提升吞吐量
      • 典型现象:源端数据变更非常密集,希望提升单次处理的数据量,减少与下游交互的次数。
      • 处理方式:适当调大此值,例如 40968192。这可以有效提升数据处理的吞吐量。
    • 场景二:内存资源受限
      • 处理方式:适当调小此值,例如 1024。减小批次大小可以降低单次处理对内存的峰值需求。
  • 调参建议
    • 吞吐量优先:在内存允许的情况下,调大此值是提升性能的有效手段。
    • 延迟敏感:调小此值可能(但不绝对)有助于降低小批量数据的处理延迟,因为数据可以更快地被送往下游。
  • 参数联动
    • 下游 Sink 的 batch_size:为了达到最佳性能,Source 的 max_batch_size 最好与下游 Sink(如 Hudi, Iceberg)的写入批次大小相协调。
  • 总结说明job.reader.max_batch_size 决定了 CDC 任务每次“打包”多少条数据进行处理。调大此值就像用更大的箱子打包,单次效率高,总吞吐量大;调小则像用小号快递袋,灵活轻便,适合内存有限或对单条数据延迟敏感的场景。

job.reader.snapshot_chunk_size

  • 参数概述
    • 参数名job.reader.snapshot_chunk_size
    • 类型:整型 (Integer)
    • 默认值1024
    • 核心作用:在全量快照(snapshot.mode=initial)阶段,每次从 MongoDB 数据库 find() 拉取数据的批次大小。
  • 适用场景
    • 场景一:快照阶段 OOM 或超时
      • 典型现象:在对一个包含非常大的文档的集合进行快照时,作业因内存溢出(OOM)或网络超时而失败。
      • 处理方式:适当调小此值,例如 256512。这可以减少单次拉取的数据量,降低对内存和网络的瞬时压力。
    • 场景二:优化快照性能
      • 处理方式:如果网络状况良好且文档体积适中,可以尝试调大此值,例如 20484096,以减少与数据库的交互次数,可能提升快照总时长。
  • 调参建议
    • 稳定性优先:默认值 1024 是一个相对安全的选择。如果遇到问题,首先尝试调小它。
    • 性能调优:在保证稳定性的前提下,逐步增大此值并观察快照性能和资源消耗,找到最佳平衡点。
  • 总结说明job.reader.snapshot_chunk_size 控制了在“全量复制”阶段,每次从 MongoDB 数据库搬运多少条记录。如果记录本身很大(比如包含大段文本或二进制内容),调小此值就像分多次、小批量搬运,虽然次数多了,但更稳妥,不易因单次负载过重而失败。

容错与健壮性

本节参数用于增强 CDC 任务在面对网络抖动、数据异常等问题时的自我恢复能力。

job.reader.cdc_retry_times

  • 参数概述
    • 参数名job.reader.cdc_retry_times
    • 类型:整型 (Integer)
    • 默认值3
    • 核心作用:这是 BitSail 框架在 Source 层面提供的一个顶层重试机制。当 CDC 源(如此处的 MongoDB CDC)在拉取数据过程中遇到可重试的异常时,框架会尝试重新拉取的最大次数。
  • 适用场景
    • 场景一:网络环境不稳定
      • 典型现象:作业偶尔因短暂的网络抖动或数据库连接池问题而失败,日志中出现 MongoSocketReadException, MongoTimeoutException 等。
      • 处理方式:可以适当调大此值,例如 510,给任务更多的机会在无需人工干预的情况下自动恢复。
  • 调参建议
    • 生产环境:建议设置为一个比默认值稍大的数,例如 5,以增强任务的健壮性。
    • 无限重试:设置为 -1 表示无限次重试,直到成功。请谨慎使用此配置,因为它可能导致任务在遇到持久性问题时一直卡住,消耗资源且无法告警。
  • 参数联动
    • Debezium 内部重试:此参数与 Debezium 内部的连接重试(如 connect.max.attempts)是两个不同层面的机制。Debezium 重试主要处理初始连接和连接中断后的恢复,而 cdc_retry_times 则是在 Debezium 成功建立连接并开始拉数据后,对拉取批次时发生的异常进行上层包裹重试。
  • 注意事项
    • 此重试机制仅对被识别为“可重试”的异常生效。对于数据格式错误、权限问题等“硬性”错误,任务会直接失败。
  • 总结说明job.reader.cdc_retry_times 为您的 CDC 任务提供了“再试一次”的勇气。当遇到网络抖动等暂时性问题时,它会尝试重新拉取数据,避免任务因小波折而中断。适当调高此值可以使您的同步任务在不稳定的环境中表现得更加顽强。

job.reader.skip_error_record

  • 参数概述
    • 参数名job.reader.skip_error_record
    • 类型:布尔型 (Boolean)
    • 默认值false
    • 核心作用:当数据在反序列化阶段(即将 Debezium 事件转换为 BitSail 内部格式)发生错误时,决定是跳过这条有问题的记录继续运行,还是直接让任务失败。
  • 适用场景
    • 场景一:数据质量优先,不容忍任何错误
      • 处理方式:保持默认值 false。一旦遇到无法解析的“脏数据”,任务会立即失败并报错,强制您去关注和解决源头的数据质量问题。这是保证数据一致性的最安全做法。
    • 场景二:业务连续性优先,可容忍少量数据丢失
      • 典型现象:源端数据偶尔会混入一些格式不规范的记录,您不希望因为这几条脏数据导致整个同步任务中断。
      • 处理方式:设置为 true。此时,当遇到解析失败的记录时,系统会打印一条错误日志,丢弃该记录,然后继续处理后续数据。
  • 调参建议
    • 生产环境:强烈建议保持默认值 false,并通过告警和日志来监控数据质量问题。
    • 临时排障或数据探查:在调试或分析包含已知脏数据的源时,可以临时设置为 true,以便任务能够跑通并处理大部分有效数据。
  • 注意事项
    • 开启 skip_error_record 意味着您接受了数据可能会丢失的风险。请务必确保有相应的监控措施来统计被跳过的记录数量,并定期排查原因。
  • 总结说明job.reader.skip_error_record 是处理“坏苹果”的策略开关。保持默认 false,遇到一个坏苹果(脏数据),整箱水果(任务)就停止检查,确保万无一失;设置为 true,则是把坏苹果挑出来扔掉,继续处理剩下的好苹果,保证流程不断,但可能会丢失少量数据。

总体使用建议与排障流程小结

1. 核心配置模板 (推荐)

对于一个典型的 MongoDB 到数据仓库/数据湖的实时同步任务,我们推荐以下核心参数组合:

{
    "job": {
        "reader": {
            "connections": [
                {"master": {"hosts": ["mongo1:27017", "mongo2:27017"]}}
            ],
            "user_name": "your_user",
            "password": "your_password",
            "use_multi_table_record": true,
            "job.reader.debezium": {
                "snapshot.mode": "initial",
                "capture.mode": "change_streams_update_full"
            }
        }
    }
}
  • connections: 提供至少2个副本集成员,保证高可用。
  • use_multi_table_record: true: 启用结构化输出,是 BitSail 高级功能的基础。
  • snapshot.mode: initial: 保证全量+增量,数据完整。
  • capture.mode: change_streams_update_full: 获取完整的更新后文档,简化下游逻辑。

2. 常见问题排障指引

问题一:作业启动失败,连接不上 MongoDB

  • 日志关键字Connection refused, Timed out, Authentication failed, MongoServerSelectionException
  • 排查步骤
    1. 检查 job.reader.connections:确认地址和端口是否正确,作业环境是否能访问到这些地址。
    2. 检查 job.reader.user_namejob.reader.password:确认用户名密码是否正确,以及该用户是否有足够权限(特别是访问 local.oplog.rs 和目标集合的 changeStream 权限)。
    3. 检查认证数据库:如果用户不在 admin 库,请通过 job.reader.debezium 透传 mongodb.auth.source
    4. 检查 SSL/TLS 配置:如果源端启用了 SSL,请透传 mongodb.ssl.enabled: "true"

问题二:作业启动后长时间没有数据流出

  • 排查步骤
    1. 检查 snapshot.mode:如果是 initial 模式,且源端数据量巨大,作业可能正处于漫长的全量快照阶段。观察日志中是否有快照相关的进度信息。
    2. 检查 initial_offset_type:如果是 latest,请确认在作业启动后源端是否有新的数据变更。
    3. 检查过滤条件:确认 database_include_listcollection_include_list 等参数是否正确配置,没有意外地将目标库表过滤掉。
    4. 检查 Oplog/Change Stream:确认源端 MongoDB 的 Oplog 或 Change Stream 是否正常产生数据。

问题三:作业因“脏数据”或反序列化错误而失败

  • 日志关键字Deserialization error, JsonParseException, Unrecognized field
  • 排查步骤
    1. 分析错误日志:定位导致失败的具体是哪条记录,检查其内容是否符合预期格式。
    2. 临时解决方案:如果希望任务继续运行,可临时将 job.reader.skip_error_record 设置为 true,但这会导致数据丢失。
    3. 根治方案:从源头治理数据质量,或在 BitSail 的 transform 算子中增加对异常数据的清洗和转换逻辑。
最近更新时间:2026.05.07 14:11:24
这个页面对您有帮助吗?
有用
有用
无用
无用