Kafka Connect 2.4中Override Policy配置失效问题咨询
你的核心问题在于手动调用put()方法传递SinkRecord完全绕过了Kafka Connect的消费者逻辑,导致你配置的auto.offset.reset覆盖策略根本没有生效。
为什么当前配置不生效?
你自己创建了消费者从Topic拉取消息,转成List<SinkRecord>后直接传给两个Sink Task的put()方法。这种情况下,Kafka Connect内置的消费者(包括你配置的offset重置规则、消费者覆盖策略)完全没有运行——数据是你主动塞进去的,不是Connect自己从Topic消费来的,自然两个Task都会收到所有消息。
正确的实现方式
要让Kafka Connect的Override Policy生效,你需要让每个Sink Connector独立管理自己的消费流程,而不是手动传递数据:
确保Connector配置正确应用消费者覆盖策略
Kafka Connect 2.3的ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX(即connector.client.consumer.override.前缀)确实可以用来覆盖消费者配置,你的参数设置是对的,但需要让Connector自己去消费Topic才能触发这个逻辑。不要手动传递
SinkRecord,让Connect框架接管消费
你需要分别启动两个独立的FileStreamSinkConnector实例,它们会根据各自的配置创建消费者,自动从Topic拉取消息,并处理offset逻辑:- 配置为
latest的Connector会从Topic的最新offset开始消费,不会接收历史数据 - 配置为
earliest的Connector会从Topic的起始offset开始消费,接收所有历史数据 - 后续新生产的消息,两个Connector都会正常接收
- 配置为
修正后的代码示例(简化版)
// 1. 配置并启动latest的FileSink Connector Map<String, String> latestSinkProps = new HashMap<>(); latestSinkProps.put(FileStreamSinkConnector.TOPICS_CONFIG, ConstantSettingsBehavior.SINGLE_TOPIC); latestSinkProps.put(FileStreamSinkConnector.FILE_CONFIG, ConstantSettingsBehavior.FILE_OUT_LATEST); latestSinkProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "5000"); latestSinkProps.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FileStreamSinkConnector latestConnector = new FileStreamSinkConnector(); latestConnector.start(latestSinkProps); // 初始化并启动Task(Connect框架通常会自动处理Task的生命周期,这里模拟) SinkTask latestTask = latestConnector.taskClass().newInstance(); SinkTaskContext latestContext = createMock(SinkTaskContext.class); latestTask.initialize(latestContext); latestTask.start(latestConnector.taskConfigs(1).get(0)); // 2. 配置并启动earliest的FileSink Connector Map<String, String> earliestSinkProps = new HashMap<>(); earliestSinkProps.put(FileStreamSinkConnector.TOPICS_CONFIG, ConstantSettingsBehavior.SINGLE_TOPIC); earliestSinkProps.put(FileStreamSinkConnector.FILE_CONFIG, ConstantSettingsBehavior.FILE_OUT_EARLY); earliestSinkProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "5000"); earliestSinkProps.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); FileStreamSinkConnector earliestConnector = new FileStreamSinkConnector(); earliestConnector.start(earliestSinkProps); SinkTask earliestTask = earliestConnector.taskClass().newInstance(); SinkTaskContext earliestContext = createMock(SinkTaskContext.class); earliestTask.initialize(earliestContext); earliestTask.start(earliestConnector.taskConfigs(1).get(0));
测试场景的特殊处理(如果必须手动传递数据)
如果你是在做单元测试,需要手动模拟消息传递,那你需要自己实现offset过滤逻辑:
- 对于
latest的Task,只传递在Connector启动之后生产的消息 - 对于
earliest的Task,传递所有消息
但这种方式完全脱离了Kafka Connect的原生消费逻辑,只适合测试,不适合生产环境。
内容的提问来源于stack exchange,提问作者Schmidt96u




