You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何为Django应用中的Kafka Consumer K8s Pod添加健康检查(Liveness/Readiness)

Hey there! Since you're new to infrastructure and need to set up health checks for your Kafka Consumer Pod in a Django app, let's break this down step by step—no jargon overload, promise.

1. First, Understand the Difference Between Liveness & Readiness Probes

Before diving into code, let's clarify what each probe does:

  • Liveness Probe: Checks if the Pod is still "alive". If it fails, Kubernetes will restart the Pod. Use this for critical failures that require a fresh start (like a frozen consumer process).
  • Readiness Probe: Checks if the Pod is ready to do its job (i.e., consume Kafka messages). If it fails, Kubernetes stops sending traffic (or in this case, ensures the consumer isn't considered part of the pool until it's ready).
2. Key Metrics to Validate for Your Kafka Consumer

Your health checks need to verify these core things:

  • The consumer is successfully connected to the Kafka cluster
  • The consumer is actively processing messages (offset is advancing)
  • No persistent errors are blocking consumption
  • The underlying Django app is healthy (e.g., database connections work if your consumer uses them)
3. Implement Health Check Endpoints in Django

First, add a custom endpoint in your Django app that checks the consumer's status. Let's assume you're using confluent-kafka (adjust if you're using kafka-python or another library).

Step 3.1: Create a Health Check View

Add this to your app's views.py:

from django.http import JsonResponse
from django.views.decorators.http import require_GET
from django.core.cache import cache
import logging
from myapp.kafka_client import get_kafka_consumer  # Your consumer instance/initializer

logger = logging.getLogger(__name__)

@require_GET
def kafka_health_check(request):
    try:
        consumer = get_kafka_consumer()
        
        # Check if consumer is connected to Kafka
        if not consumer.connected():
            logger.warning("Kafka consumer not connected to cluster")
            return JsonResponse(
                {"status": "unhealthy", "reason": "Not connected to Kafka"},
                status=503
            )
        
        # Check if message offsets are advancing (prevents stuck consumers)
        current_offsets = consumer.position(consumer.assignment())
        last_offsets = cache.get("kafka_consumer_last_offsets", {})
        
        if last_offsets:
            for partition, offset in current_offsets.items():
                # If offset hasn't moved in the last check period, mark as unhealthy
                if offset <= last_offsets.get(partition, 0):
                    logger.warning(f"Kafka partition {partition} offset not advancing")
                    return JsonResponse(
                        {"status": "unhealthy", "reason": f"Partition {partition} stuck"},
                        status=503
                    )
        
        # Update cache with current offsets (cache expires in 60s)
        cache.set("kafka_consumer_last_offsets", current_offsets, 60)
        
        # Check for recent critical errors
        if hasattr(consumer, "last_fatal_error") and consumer.last_fatal_error:
            logger.error(f"Kafka consumer fatal error: {consumer.last_fatal_error}")
            return JsonResponse(
                {"status": "unhealthy", "reason": str(consumer.last_fatal_error)},
                status=503
            )
        
        # All checks passed
        return JsonResponse({"status": "healthy"}, status=200)
    
    except Exception as e:
        logger.error(f"Health check failed with exception: {str(e)}")
        return JsonResponse(
            {"status": "unhealthy", "reason": "Unexpected error in health check"},
            status=503
        )

Step 3.2: Wire Up the URL

Add this to your project's urls.py:

from django.urls import path
from myapp.views import kafka_health_check

urlpatterns = [
    # ... your other URLs
    path("health/kafka/", kafka_health_check, name="kafka-health"),
]
4. Configure Kubernetes Probes

Now update your Deployment YAML to add the liveness and readiness probes pointing to your new endpoint.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: django-kafka-consumer
spec:
  replicas: 2
  selector:
    matchLabels:
      app: django-kafka-consumer
  template:
    metadata:
      labels:
        app: django-kafka-consumer
    spec:
      containers:
      - name: consumer-container
        image: your-django-image:latest
        ports:
        - containerPort: 8000  # Match your Django app's port
        # Readiness Probe: Wait for consumer to initialize before marking ready
        readinessProbe:
          httpGet:
            path: /health/kafka/
            port: 8000
          initialDelaySeconds: 30  # Give time to connect to Kafka
          periodSeconds: 10        # Check every 10s
          failureThreshold: 3      # Fail after 3 consecutive checks
        # Liveness Probe: Restart if consumer is truly stuck
        livenessProbe:
          httpGet:
            path: /health/kafka/
            port: 8000
          initialDelaySeconds: 60  # Longer delay for full initialization
          periodSeconds: 15        # Check less frequently than readiness
          failureThreshold: 5      # Tolerate minor blips before restarting
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-cluster:9092"
        # Add your other Django/Kafka env vars here
5. Pro Tips for Newbies
  • Test Locally: Run your Django app and use curl http://localhost:8000/health/kafka/ to verify the endpoint works. Try disconnecting Kafka to see if it returns a 503.
  • Thread Safety: If you're using a multi-process server like Gunicorn, ensure your consumer instance is properly initialized per process (or use a shared cache like Redis for offset tracking).
  • Keep Checks Light: Health checks should run quickly—avoid heavy computations or long-running queries here, as they can cause false positives.
  • Log Everything: Add detailed logs to your health check view so you can debug why a probe failed later.

内容的提问来源于stack exchange,提问作者Dev

火山引擎 最新活动