You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spring Kafka中如何销毁闲置的Message Listener及对应容器对象?

Solution: Automatic Cleanup for Idle Kafka Listener Containers

Nice catch on this potential resource leak! When you're spinning up one-off Kafka listener containers that get paused after completing their task, you need a way to track and clean them up automatically. Here's a practical, step-by-step solution tailored to your code:

1. Create a Container Tracking & Cleanup Manager

First, we'll build a singleton manager to keep track of all created containers and their last active timestamps. This manager will run a scheduled task to scan for idle containers and clean them up.

import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.ListenerContainer;
import org.springframework.kafka.listener.ListenerContainerAdapter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class KafkaContainerCleanupManager {
    // Singleton instance
    private static final KafkaContainerCleanupManager INSTANCE = new KafkaContainerCleanupManager();
    // Track containers and their last active timestamp (ms)
    private final Map<KafkaMessageListenerContainer<?, ?>, Long> containerLastActiveMap = new ConcurrentHashMap<>();
    // Scheduler for periodic cleanup checks
    private final ScheduledExecutorService cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
    // Idle threshold (adjust this based on your needs - example: 5 minutes)
    private static final long IDLE_THRESHOLD_MS = TimeUnit.MINUTES.toMillis(5);

    private KafkaContainerCleanupManager() {
        // Start cleanup task: run every 1 minute, first check after 1 minute
        cleanupScheduler.scheduleAtFixedRate(
            this::cleanupIdleContainers,
            1, 1, TimeUnit.MINUTES
        );
    }

    public static KafkaContainerCleanupManager getInstance() {
        return INSTANCE;
    }

    // Register a new container and set up state tracking
    public void registerContainer(KafkaMessageListenerContainer<?, ?> container) {
        containerLastActiveMap.put(container, System.currentTimeMillis());
        
        // Listen for container pause events to update last active time
        container.addContainerListener(new ListenerContainerAdapter() {
            @Override
            public void containerPaused(ListenerContainer<?> container) {
                containerLastActiveMap.put((KafkaMessageListenerContainer<?, ?>) container, System.currentTimeMillis());
            }
        });
    }

    // Core cleanup logic
    private void cleanupIdleContainers() {
        long currentTime = System.currentTimeMillis();
        
        containerLastActiveMap.forEach((container, lastActiveTime) -> {
            // Only clean up containers that are running, paused, and idle beyond the threshold
            if (container.isRunning() && container.isPaused() && (currentTime - lastActiveTime) > IDLE_THRESHOLD_MS) {
                // Stop the container to release Kafka consumer resources
                container.stop();
                // Remove from tracking map
                containerLastActiveMap.remove(container);
                System.out.printf("Cleaned up idle Kafka container (group ID: %s)%n", container.getContainerProperties().getGroupId());
            }
        });
    }

    // Call this when your application shuts down to clean up the scheduler
    public void shutdown() {
        cleanupScheduler.shutdown();
    }
}

2. Update Your Listener to Track Activity

Modify your Listener class to hold a reference to its parent container, so you can update the last active time when the pause condition is triggered:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;

public class Listener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {
    private final KafkaMessageListenerContainer<String, String> parentContainer;

    public Listener(KafkaMessageListenerContainer<String, String> parentContainer) {
        this.parentContainer = parentContainer;
    }

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
        // Your existing logic to check if we need to pause
        boolean shouldPause = /* Replace with your condition (e.g., consumerRecord.offset() == targetOffset) */;
        
        if (shouldPause) {
            consumer.pause(consumer.assignment());
            // Update last active time for the container
            KafkaContainerCleanupManager.getInstance().containerLastActiveMap.put(parentContainer, System.currentTimeMillis());
        }
    }
}

3. Modify Container Creation to Register with the Manager

Update your getContainer method to register each new container with the cleanup manager and pass the container reference to the listener:

import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.apache.kafka.common.TopicPartitionOffset;

import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

public class KafkaContainerFactory {
    public KafkaMessageListenerContainer<String, String> getContainer(String topic, int partition, long offset) {
        ContainerProperties containerProperties = new ContainerProperties(new TopicPartitionOffset(topic, partition, offset));
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        container.getContainerProperties().setGroupId(UUID.randomUUID().toString());
        container.setAutoStartup(false);
        
        // Pass container reference to listener
        Listener listener = new Listener(container);
        container.getContainerProperties().setMessageListener(listener);
        
        // Register container with cleanup manager
        KafkaContainerCleanupManager.getInstance().registerContainer(container);
        
        return container;
    }

    private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

Key Notes & Best Practices

  • Thread Safety: We use ConcurrentHashMap to track containers, which is safe for multi-threaded access since your API calls and cleanup task will run in different threads.
  • Resource Release: Calling container.stop() is critical - it shuts down the underlying Kafka consumer, releases network connections, and frees up Kafka broker resources.
  • Threshold Tuning: Adjust IDLE_THRESHOLD_MS based on your use case. If your listeners might be paused briefly before being reused (though you said they're useless after pause), set a longer threshold.
  • Shutdown Handling: Don't forget to call KafkaContainerCleanupManager.getInstance().shutdown() when your application exits to stop the scheduler thread cleanly.

内容的提问来源于stack exchange,提问作者Trayambak Kumar

火山引擎 最新活动