此错误通常表示生产者未能将消息成功序列化。要解决此问题,请确保使用正确的序列化程序,或使用默认序列化程序。例如,对于Avro序列化程序,您可以使用适当的Avro模式来配置生产者。以下是一个示例:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# 配置Avro模式
avro_schema = {
"type": "record",
"name": "test",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"}
]
}
# 使用AvroProducer发送消息
avroProducer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_value_schema=avro.loads(json.dumps(avro_schema)))
avroProducer.produce(topic='test_topic', value={"id": 1, "name": "test"})
avroProducer.flush()