You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Confluent Kafka Python脚本中配置SASL PLAIN(API)与GSSAPI(Kerberos)认证及信任库

Solution for Adding SASL Authentication & Truststore Support to Confluent Kafka Python Producer

Hey there! Let's get your Confluent Kafka Python producer configured with SASL PLAIN (API key/password) and GSSAPI (Kerberos) authentication, plus support for private data center truststores. We'll use environment variables to keep the script flexible—so you can switch between auth methods without modifying code directly.

Modified Python Script

Here's the updated code with all required configurations integrated:

import json
import os
import string
import random
import socket
import uuid
import re
from datetime import datetime
import time
import hashlib
import math
import sys
from functools import cache
from confluent_kafka import Producer, KafkaError, KafkaException

topic_name = os.environ['TOPIC_NAME']
partition_count = int(os.environ['PARTITION_COUNT'])
message_key_template = json.loads(os.environ['KEY_TEMPLATE'])
message_value_template = json.loads(os.environ['VALUE_TEMPLATE'])
message_header_template = json.loads(os.environ['HEADER_TEMPLATE'])
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
perf_counter_batch_size = int(os.environ.get('PERF_COUNTER_BATCH_SIZE', 100))
messages_per_aggregate = int(os.environ.get('MESSAGES_PER_AGGREGATE', 1))
max_message_count = int(os.environ.get('MAX_MESSAGE_COUNT', sys.maxsize))

def error_cb(err):
    """ The error callback is used for generic client errors. These errors are generally to be considered informational as the client will automatically try to recover from all errors, and no extra action is typically required by the application. For this example however, we terminate the application if the client is unable to connect to any broker (_ALL_BROKERS_DOWN) and on authentication errors (_AUTHENTICATION). """
    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

def acked(err, msg):
    if err is not None:
        print("Failed to send message: %s: %s" % (str(msg), str(err)))

producer_configs = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': socket.gethostname(),
    'error_cb': error_cb
}

# --------------------------
# Add SSL/Truststore Configuration for Private DC
# --------------------------
ssl_enabled = os.environ.get('SSL_ENABLED', 'false').lower() == 'true'
if ssl_enabled:
    # Base SSL config
    ssl_config = {
        'ssl.endpoint.identification.algorithm': 'https',
        'security.protocol': 'SSL'  # Default to SSL if no SASL
    }
    
    # Add truststore details (PEM or JKS)
    if os.environ.get('SSL_CA_LOCATION'):
        # Use PEM-formatted CA cert (most common for Python clients)
        ssl_config['ssl.ca.location'] = os.environ['SSL_CA_LOCATION']
    elif os.environ.get('SSL_TRUSTSTORE_LOCATION'):
        # Use JKS truststore (if you have one from Java environments)
        ssl_config['ssl.truststore.location'] = os.environ['SSL_TRUSTSTORE_LOCATION']
        ssl_config['ssl.truststore.password'] = os.environ['SSL_TRUSTSTORE_PASSWORD']
    
    producer_configs.update(ssl_config)

# --------------------------
# Add SASL Authentication Configuration
# --------------------------
sasl_mechanism = os.environ.get('SASL_MECHANISM')
if sasl_mechanism:
    # Update security protocol to SASL_SSL if SSL is enabled
    if ssl_enabled:
        producer_configs['security.protocol'] = 'SASL_SSL'
    else:
        # Not recommended for production, but allows SASL_PLAINTEXT if needed
        producer_configs['security.protocol'] = 'SASL_PLAINTEXT'
    
    producer_configs['sasl.mechanism'] = sasl_mechanism
    
    if sasl_mechanism == 'PLAIN':
        # Validate required PLAIN auth env vars
        required_vars = ['SASL_USERNAME', 'SASL_PASSWORD']
        missing_vars = [var for var in required_vars if not os.environ.get(var)]
        if missing_vars:
            raise ValueError(f"Missing required environment variables for SASL PLAIN: {', '.join(missing_vars)}")
        
        producer_configs.update({
            'sasl.username': os.environ['SASL_USERNAME'],
            'sasl.password': os.environ['SASL_PASSWORD']
        })
    
    elif sasl_mechanism == 'GSSAPI':
        # Base GSSAPI config (Kafka service name is usually 'kafka')
        gssapi_config = {
            'sasl.kerberos.service.name': os.environ.get('SASL_KERBEROS_SERVICE_NAME', 'kafka')
        }
        
        # Optional: Add keytab/principal if using non-interactive auth
        if os.environ.get('SASL_KERBEROS_KEYTAB') and os.environ.get('SASL_KERBEROS_PRINCIPAL'):
            gssapi_config.update({
                'sasl.kerberos.keytab': os.environ['SASL_KERBEROS_KEYTAB'],
                'sasl.kerberos.principal': os.environ['SASL_KERBEROS_PRINCIPAL']
            })
        
        producer_configs.update(gssapi_config)

# Initialize producer with all configs
producer = Producer(producer_configs)

# generates a random value if it is not cached in the template_values dictionary
def get_templated_value(term, template_values):
    if not term in template_values:
        template_values[term] = str(uuid.uuid4())
    return template_values[term]

def fill_template_value(value, template_values):
    str_value = str(value)
    template_regex = '{{(.+?)}}'
    templated_terms = re.findall(template_regex, str_value)
    for term in templated_terms:
        str_value = str_value.replace(f"{{{{{term}}}}}", get_templated_value(term, template_values))
    return str_value

def fill_template(template, templated_terms):
    # TODO: Need to address metadata field, as it's treated as a string instead of a nested object.
    return {field: fill_template_value(value, templated_terms) for field, value in template.items()}

@cache
def get_partition(lock_id):
    bits = 128
    bucket_size = 2**bits / partition_count
    partition = (int(hashlib.md5(lock_id.encode('utf-8')).hexdigest(), 16) / bucket_size)
    return math.floor(partition)

sequence_number = int(time.time() * 1000)
sequence_number = 0
message_count = 0
producing = True
start_time = time.perf_counter()
aggregate_message_counter = 0
# cache for templated term values so that they match across the different templates
templated_values = {}

try:
    while producing:
        sequence_number += 1
        aggregate_message_counter += 1
        message_count += 1

        if aggregate_message_counter % messages_per_aggregate == 0:
            # reset templated values
            templated_values = {}
        else:
            for term in list(templated_values):
                if term not in ['aggregateId', 'tenantId']:
                    del(templated_values[term])

        # Fill in templated field values
        message_key = fill_template(message_key_template, templated_values)
        message_value = fill_template(message_value_template, templated_values)
        message_header = fill_template(message_header_template, templated_values)

        ts = datetime.utcnow().isoformat()[:-3]+'Z'
        message_header['timestamp'] = ts
        message_header['sequence_number'] = str(sequence_number)
        message_value['timestamp'] = ts
        message_value['sequenceNumber'] = sequence_number

        lock_id = message_header['lock_id']
        partition = get_partition(lock_id)
        # partition by lock_id, since key could be random, but a given aggregate_id should ALWAYS resolve to the same partition, regardless of key.

        # Send message
        producer.produce(topic_name, partition=partition, key=json.dumps(message_key), value=json.dumps(message_value), headers=message_header, callback=acked)

        if sequence_number % perf_counter_batch_size == 0:
            producer.flush()
            end_time = time.perf_counter()
            total_duration = end_time - start_time
            messages_per_second=(perf_counter_batch_size/total_duration)
            print(f'{messages_per_second} messages/second')
            # reset start time
            start_time = time.perf_counter()

        if message_count >= max_message_count:
            break
except Exception as e:
    print(f'ERROR: %s' % e)
    sys.exit(1)
finally:
    producer.flush()

How to Use the Configurations

1. SASL PLAIN Authentication (API Key/Password)

Set these environment variables before running the script:

  • SASL_MECHANISM=PLAIN
  • SASL_USERNAME=your-api-username
  • SASL_PASSWORD=your-api-password
  • SSL_ENABLED=true
  • SSL_CA_LOCATION=/path/to/private-dc-ca.pem (path to your private DC's root CA cert)

2. GSSAPI (Kerberos) Authentication

For Kerberos, you can use either interactive auth (uses your current ticket) or non-interactive (keytab):

  • SASL_MECHANISM=GSSAPI
  • SASL_KERBEROS_SERVICE_NAME=kafka (match your Kafka cluster's service principal name)
  • SSL_ENABLED=true
  • SSL_CA_LOCATION=/path/to/private-dc-ca.pem

Optional (non-interactive auth):

  • SASL_KERBEROS_KEYTAB=/path/to/your.keytab
  • SASL_KERBEROS_PRINCIPAL=your-principal@YOUR-REALM.COM

3. Truststore Only (No SASL)

If you just need to connect to a private DC with SSL but no SASL:

  • SSL_ENABLED=true
  • SSL_CA_LOCATION=/path/to/private-dc-ca.pem

Notes

  • Always use SASL_SSL (not SASL_PLAINTEXT) in production to encrypt traffic.
  • For JKS truststores (common in Java environments), use SSL_TRUSTSTORE_LOCATION and SSL_TRUSTSTORE_PASSWORD instead of SSL_CA_LOCATION.
  • The script includes validation for required PLAIN auth variables to catch missing configs early.

内容的提问来源于stack exchange,提问作者sparsh bhardwaj

火山引擎 最新活动