- 确保在应用中正确配置了 Kafka 的连接信息。
- 确认应用中是否正确创建了 Kafka 主题,并正确设置了消费者。
- 确认 Kafka 主题的名称是否正确,并且消费者是否订阅了相应主题。
- 如果以上方法都没有解决问题,可能是由于消费者未正确连接到 Kafka 服务器。可以检查 Kafka 服务器的日志以确认连接是否成功并排除网络故障等影响。
- 代码示例:
@SpringBootApplication
@EnableKafka
public class EmployeeManagementApplication {
public static void main(String[] args) {
SpringApplication.run(EmployeeManagementApplication.class, args);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic kafkaLogPublisherTopic() {
return new NewTopic("employee-management-1.kafka-log-publisher", 1, (short) 1);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "employee-management-group");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "employee-management-1.kafka-log-publisher", groupId = "employee-management-group")
public void consume(String message)