如何在Confluent Kafka Python脚本中配置SASL PLAIN(API)与GSSAPI(Kerberos)认证及信任库
Solution for Adding SASL Authentication & Truststore Support to Confluent Kafka Python Producer
Hey there! Let's get your Confluent Kafka Python producer configured with SASL PLAIN (API key/password) and GSSAPI (Kerberos) authentication, plus support for private data center truststores. We'll use environment variables to keep the script flexible—so you can switch between auth methods without modifying code directly.
Modified Python Script
Here's the updated code with all required configurations integrated:
import json import os import string import random import socket import uuid import re from datetime import datetime import time import hashlib import math import sys from functools import cache from confluent_kafka import Producer, KafkaError, KafkaException topic_name = os.environ['TOPIC_NAME'] partition_count = int(os.environ['PARTITION_COUNT']) message_key_template = json.loads(os.environ['KEY_TEMPLATE']) message_value_template = json.loads(os.environ['VALUE_TEMPLATE']) message_header_template = json.loads(os.environ['HEADER_TEMPLATE']) bootstrap_servers = os.environ['BOOTSTRAP_SERVERS'] perf_counter_batch_size = int(os.environ.get('PERF_COUNTER_BATCH_SIZE', 100)) messages_per_aggregate = int(os.environ.get('MESSAGES_PER_AGGREGATE', 1)) max_message_count = int(os.environ.get('MAX_MESSAGE_COUNT', sys.maxsize)) def error_cb(err): """ The error callback is used for generic client errors. These errors are generally to be considered informational as the client will automatically try to recover from all errors, and no extra action is typically required by the application. For this example however, we terminate the application if the client is unable to connect to any broker (_ALL_BROKERS_DOWN) and on authentication errors (_AUTHENTICATION). """ print("Client error: {}".format(err)) if err.code() == KafkaError._ALL_BROKERS_DOWN or \ err.code() == KafkaError._AUTHENTICATION: # Any exception raised from this callback will be re-raised from the # triggering flush() or poll() call. raise KafkaException(err) def acked(err, msg): if err is not None: print("Failed to send message: %s: %s" % (str(msg), str(err))) producer_configs = { 'bootstrap.servers': bootstrap_servers, 'client.id': socket.gethostname(), 'error_cb': error_cb } # -------------------------- # Add SSL/Truststore Configuration for Private DC # -------------------------- ssl_enabled = os.environ.get('SSL_ENABLED', 'false').lower() == 'true' if ssl_enabled: # Base SSL config ssl_config = { 'ssl.endpoint.identification.algorithm': 'https', 'security.protocol': 'SSL' # Default to SSL if no SASL } # Add truststore details (PEM or JKS) if os.environ.get('SSL_CA_LOCATION'): # Use PEM-formatted CA cert (most common for Python clients) ssl_config['ssl.ca.location'] = os.environ['SSL_CA_LOCATION'] elif os.environ.get('SSL_TRUSTSTORE_LOCATION'): # Use JKS truststore (if you have one from Java environments) ssl_config['ssl.truststore.location'] = os.environ['SSL_TRUSTSTORE_LOCATION'] ssl_config['ssl.truststore.password'] = os.environ['SSL_TRUSTSTORE_PASSWORD'] producer_configs.update(ssl_config) # -------------------------- # Add SASL Authentication Configuration # -------------------------- sasl_mechanism = os.environ.get('SASL_MECHANISM') if sasl_mechanism: # Update security protocol to SASL_SSL if SSL is enabled if ssl_enabled: producer_configs['security.protocol'] = 'SASL_SSL' else: # Not recommended for production, but allows SASL_PLAINTEXT if needed producer_configs['security.protocol'] = 'SASL_PLAINTEXT' producer_configs['sasl.mechanism'] = sasl_mechanism if sasl_mechanism == 'PLAIN': # Validate required PLAIN auth env vars required_vars = ['SASL_USERNAME', 'SASL_PASSWORD'] missing_vars = [var for var in required_vars if not os.environ.get(var)] if missing_vars: raise ValueError(f"Missing required environment variables for SASL PLAIN: {', '.join(missing_vars)}") producer_configs.update({ 'sasl.username': os.environ['SASL_USERNAME'], 'sasl.password': os.environ['SASL_PASSWORD'] }) elif sasl_mechanism == 'GSSAPI': # Base GSSAPI config (Kafka service name is usually 'kafka') gssapi_config = { 'sasl.kerberos.service.name': os.environ.get('SASL_KERBEROS_SERVICE_NAME', 'kafka') } # Optional: Add keytab/principal if using non-interactive auth if os.environ.get('SASL_KERBEROS_KEYTAB') and os.environ.get('SASL_KERBEROS_PRINCIPAL'): gssapi_config.update({ 'sasl.kerberos.keytab': os.environ['SASL_KERBEROS_KEYTAB'], 'sasl.kerberos.principal': os.environ['SASL_KERBEROS_PRINCIPAL'] }) producer_configs.update(gssapi_config) # Initialize producer with all configs producer = Producer(producer_configs) # generates a random value if it is not cached in the template_values dictionary def get_templated_value(term, template_values): if not term in template_values: template_values[term] = str(uuid.uuid4()) return template_values[term] def fill_template_value(value, template_values): str_value = str(value) template_regex = '{{(.+?)}}' templated_terms = re.findall(template_regex, str_value) for term in templated_terms: str_value = str_value.replace(f"{{{{{term}}}}}", get_templated_value(term, template_values)) return str_value def fill_template(template, templated_terms): # TODO: Need to address metadata field, as it's treated as a string instead of a nested object. return {field: fill_template_value(value, templated_terms) for field, value in template.items()} @cache def get_partition(lock_id): bits = 128 bucket_size = 2**bits / partition_count partition = (int(hashlib.md5(lock_id.encode('utf-8')).hexdigest(), 16) / bucket_size) return math.floor(partition) sequence_number = int(time.time() * 1000) sequence_number = 0 message_count = 0 producing = True start_time = time.perf_counter() aggregate_message_counter = 0 # cache for templated term values so that they match across the different templates templated_values = {} try: while producing: sequence_number += 1 aggregate_message_counter += 1 message_count += 1 if aggregate_message_counter % messages_per_aggregate == 0: # reset templated values templated_values = {} else: for term in list(templated_values): if term not in ['aggregateId', 'tenantId']: del(templated_values[term]) # Fill in templated field values message_key = fill_template(message_key_template, templated_values) message_value = fill_template(message_value_template, templated_values) message_header = fill_template(message_header_template, templated_values) ts = datetime.utcnow().isoformat()[:-3]+'Z' message_header['timestamp'] = ts message_header['sequence_number'] = str(sequence_number) message_value['timestamp'] = ts message_value['sequenceNumber'] = sequence_number lock_id = message_header['lock_id'] partition = get_partition(lock_id) # partition by lock_id, since key could be random, but a given aggregate_id should ALWAYS resolve to the same partition, regardless of key. # Send message producer.produce(topic_name, partition=partition, key=json.dumps(message_key), value=json.dumps(message_value), headers=message_header, callback=acked) if sequence_number % perf_counter_batch_size == 0: producer.flush() end_time = time.perf_counter() total_duration = end_time - start_time messages_per_second=(perf_counter_batch_size/total_duration) print(f'{messages_per_second} messages/second') # reset start time start_time = time.perf_counter() if message_count >= max_message_count: break except Exception as e: print(f'ERROR: %s' % e) sys.exit(1) finally: producer.flush()
How to Use the Configurations
1. SASL PLAIN Authentication (API Key/Password)
Set these environment variables before running the script:
SASL_MECHANISM=PLAINSASL_USERNAME=your-api-usernameSASL_PASSWORD=your-api-passwordSSL_ENABLED=trueSSL_CA_LOCATION=/path/to/private-dc-ca.pem(path to your private DC's root CA cert)
2. GSSAPI (Kerberos) Authentication
For Kerberos, you can use either interactive auth (uses your current ticket) or non-interactive (keytab):
SASL_MECHANISM=GSSAPISASL_KERBEROS_SERVICE_NAME=kafka(match your Kafka cluster's service principal name)SSL_ENABLED=trueSSL_CA_LOCATION=/path/to/private-dc-ca.pem
Optional (non-interactive auth):
SASL_KERBEROS_KEYTAB=/path/to/your.keytabSASL_KERBEROS_PRINCIPAL=your-principal@YOUR-REALM.COM
3. Truststore Only (No SASL)
If you just need to connect to a private DC with SSL but no SASL:
SSL_ENABLED=trueSSL_CA_LOCATION=/path/to/private-dc-ca.pem
Notes
- Always use
SASL_SSL(notSASL_PLAINTEXT) in production to encrypt traffic. - For JKS truststores (common in Java environments), use
SSL_TRUSTSTORE_LOCATIONandSSL_TRUSTSTORE_PASSWORDinstead ofSSL_CA_LOCATION. - The script includes validation for required PLAIN auth variables to catch missing configs early.
内容的提问来源于stack exchange,提问作者sparsh bhardwaj




