You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Kafka Producer跨双数据中心故障转移方案咨询

我之前在跨数据中心部署Kafka的场景下处理过类似的故障转移需求,给你整理几个实用的方案、可用工具和代码示例,应该能帮到你:

一、核心故障转移模式

  • 主动-被动(主备)模式:这完全匹配你的需求——默认优先往DC1主集群发消息,当DC1不可用时自动切换到DC2备集群;等DC1恢复后,再切回主集群。这种模式适合对消息一致性要求较高,且备集群平时不承担业务流量的场景。
  • 双活模式(备选):如果业务允许少量消息重复,也可以同时往两个集群发送消息,但这更偏向冗余备份而非故障转移,所以可能不是你的核心需求,但可以作为兜底方案。

二、现成可用的库/组件

不用自己从零造轮子的话,这些工具可以直接用:

  • Kafka Producer内置机制+自定义扩展:Producer本身支持配置多个bootstrap.servers,但默认不会自动切换集群,需要结合自定义拦截器或故障检测逻辑来实现切换。
  • Spring Kafka动态路由:如果你用Spring生态,可以通过自定义ProducerFactory和动态KafkaTemplate来优雅实现集群切换,Spring的生态能帮你简化很多配置和生命周期管理。
  • Confluent Multi-Cluster Client:Confluent官方提供的商业多集群客户端,支持自动故障转移和负载均衡,如果你有Confluent平台的授权,这是最省心的选择。
  • Apache Kafka MirrorMaker 2.0:虽然它主要做集群镜像同步,但可以配合Producer的路由逻辑,在故障转移时确保消息不丢失,不过它更多是数据同步工具,需要结合其他逻辑实现切换。

三、具体实现方法

1. 纯Java自定义拦截器方案(无框架依赖)

这种方案不需要额外框架,纯靠Kafka原生API实现:

  • 步骤1:配置两个集群的bootstrap.servers到Producer参数中,同时自定义一个ProducerInterceptor来处理故障检测和切换逻辑。
  • 步骤2:在拦截器中启动定时健康检测任务,定期检查DC1集群的可用性(比如用AdminClient连接集群并获取主题列表)。
  • 步骤3:当检测到DC1不可用时,切换Producer的bootstrap.servers为DC2的地址;当DC1恢复后,再切回。注意要配合metadata.max.age.ms配置,让Producer快速刷新集群元数据。

2. Spring Kafka动态切换方案

如果用Spring,代码会更简洁易维护:

  • 步骤1:定义两个ProducerFactory,分别对应DC1和DC2的集群配置。
  • 步骤2:实现一个动态KafkaTemplate,根据当前集群状态切换底层的ProducerFactory
  • 步骤3:编写健康检测组件,定时检查DC1的可用性,触发KafkaTemplate的集群切换逻辑。

3. 配置中心驱动方案

把集群配置放到Nacos、Consul这类配置中心,Producer监听配置变化来切换集群:

  • 步骤1:在配置中心维护当前活跃集群的bootstrap.servers地址。
  • 步骤2:Producer启动时从配置中心拉取配置,同时监听配置变更事件,当配置更新时重新初始化Producer实例。
  • 步骤3:配合健康检测组件,当DC1故障时更新配置中心的活跃集群为DC2,恢复后再更新回DC1。

四、代码示例

1. 纯Java自定义拦截器实现

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.Configurable;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class DcFailoverInterceptor<K, V> implements ProducerInterceptor<K, V>, Configurable {

    private final AtomicBoolean dc1Available = new AtomicBoolean(true);
    private String dc1Bootstrap;
    private String dc2Bootstrap;
    private ScheduledExecutorService healthChecker;

    @Override
    public void configure(Map<String, ?> configs) {
        dc1Bootstrap = (String) configs.get("dc1.bootstrap.servers");
        dc2Bootstrap = (String) configs.get("dc2.bootstrap.servers");
        // 每10秒检测一次DC1健康状态
        healthChecker = Executors.newSingleThreadScheduledExecutor();
        healthChecker.scheduleAtFixedRate(this::checkDc1Health, 0, 10, TimeUnit.SECONDS);
    }

    private void checkDc1Health() {
        boolean isHealthy = checkClusterHealth(dc1Bootstrap);
        dc1Available.set(isHealthy);
    }

    private boolean checkClusterHealth(String bootstrapServers) {
        try (AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
            // 尝试获取主题列表,超时5秒
            adminClient.listTopics().names().get(5, TimeUnit.SECONDS);
            return true;
        } catch (Exception e) {
            System.err.println("Cluster " + bootstrapServers + " is unhealthy: " + e.getMessage());
            return false;
        }
    }

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        // 这里可以根据dc1Available的状态,动态修改Producer的bootstrap配置
        // 注意:Producer是线程安全的,但修改配置需要重新初始化或刷新元数据
        // 简化实现:如果DC1不可用,手动构造指向DC2的ProducerRecord(实际更推荐切换Producer的bootstrap)
        if (!dc1Available.get()) {
            return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), record.value());
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 如果发送到DC1失败,立即标记DC1不可用
        if (exception != null && dc1Available.get()) {
            dc1Available.set(false);
            System.err.println("DC1 send failed, switching to DC2");
        }
    }

    @Override
    public void close() {
        healthChecker.shutdown();
    }
}

Producer初始化代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class FailoverProducerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 默认用DC1的地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "dc1-kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 注册自定义拦截器
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, DcFailoverInterceptor.class.getName());
        props.put("dc1.bootstrap.servers", "dc1-kafka:9092");
        props.put("dc2.bootstrap.servers", "dc2-kafka:9092");
        // 缩短元数据刷新时间,加快切换速度
        props.put(ProducerConfig.METADATA_MAX_AGE_MS_CONFIG, "30000");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 模拟发送消息
        while (true) {
            producer.send(new ProducerRecord<>("test-topic", "hello from failover producer"));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        producer.close();
    }
}

2. Spring Kafka动态切换示例

配置类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.clients.producer.ProducerConfig.*;

@Configuration
public class KafkaClusterConfig {

    @Value("${kafka.dc1.bootstrap-servers}")
    private String dc1Bootstrap;

    @Value("${kafka.dc2.bootstrap-servers}")
    private String dc2Bootstrap;

    @Bean("dc1ProducerFactory")
    public ProducerFactory<String, String> dc1ProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(BOOTSTRAP_SERVERS_CONFIG, dc1Bootstrap);
        config.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean("dc2ProducerFactory")
    public ProducerFactory<String, String> dc2ProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(BOOTSTRAP_SERVERS_CONFIG, dc2Bootstrap);
        config.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public DynamicKafkaTemplate dynamicKafkaTemplate(
            @org.springframework.beans.factory.annotation.Qualifier("dc1ProducerFactory") ProducerFactory<String, String> dc1Factory,
            @org.springframework.beans.factory.annotation.Qualifier("dc2ProducerFactory") ProducerFactory<String, String> dc2Factory) {
        return new DynamicKafkaTemplate(dc1Factory, dc2Factory);
    }

    @Bean
    public ClusterHealthChecker clusterHealthChecker(DynamicKafkaTemplate dynamicTemplate) {
        return new ClusterHealthChecker(dynamicTemplate, dc1Bootstrap);
    }
}

动态KafkaTemplate

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

public class DynamicKafkaTemplate extends KafkaTemplate<String, String> {

    private final ProducerFactory<String, String> dc1Factory;
    private final ProducerFactory<String, String> dc2Factory;
    private final AtomicBoolean useDc1 = new AtomicBoolean(true);

    public DynamicKafkaTemplate(ProducerFactory<String, String> dc1Factory, ProducerFactory<String, String> dc2Factory) {
        super(dc1Factory);
        this.dc1Factory = dc1Factory;
        this.dc2Factory = dc2Factory;
    }

    public void switchToDc2() {
        if (useDc1.compareAndSet(true, false)) {
            super.setProducerFactory(dc2Factory);
            System.out.println("Switched to DC2 Kafka cluster");
        }
    }

    public void switchToDc1() {
        if (useDc1.compareAndSet(false, true)) {
            super.setProducerFactory(dc1Factory);
            System.out.println("Switched back to DC1 Kafka cluster");
        }
    }

    public boolean isUsingDc1() {
        return useDc1.get();
    }
}

健康检测组件

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class ClusterHealthChecker {

    private final DynamicKafkaTemplate dynamicTemplate;
    private final String dc1Bootstrap;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public ClusterHealthChecker(DynamicKafkaTemplate dynamicTemplate, String dc1Bootstrap) {
        this.dynamicTemplate = dynamicTemplate;
        this.dc1Bootstrap = dc1Bootstrap;
        // 每10秒检测一次
        scheduler.scheduleAtFixedRate(this::checkDc1Health, 0, 10, TimeUnit.SECONDS);
    }

    private void checkDc1Health() {
        boolean isHealthy = checkClusterHealth(dc1Bootstrap);
        if (isHealthy && !dynamicTemplate.isUsingDc1()) {
            dynamicTemplate.switchToDc1();
        } else if (!isHealthy && dynamicTemplate.isUsingDc1()) {
            dynamicTemplate.switchToDc2();
        }
    }

    private boolean checkClusterHealth(String bootstrapServers) {
        try (AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers))) {
            adminClient.listTopics().names().get(5, TimeUnit.SECONDS);
            return true;
        } catch (Exception e) {
            System.err.println("DC1 cluster is unhealthy: " + e.getMessage());
            return false;
        }
    }

    @PreDestroy
    public void shutdown() {
        scheduler.shutdown();
    }
}

使用示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageSender {

    @Autowired
    private DynamicKafkaTemplate dynamicKafkaTemplate;

    public void sendMessage(String topic, String message) {
        dynamicKafkaTemplate.send(topic, message);
    }
}

内容的提问来源于stack exchange,提问作者andrii

火山引擎 最新活动