这个问题是由于Kafka代理缺少所需的授权或权限配置引起的。解决此问题需要在Kafka代理的配置文件中添加必要的授权和权限信息,以确保代理能够正确访问ZooKeeper,并获取所需的Brokers列表。
例如,在Kafka的server.properties配置文件中添加以下SASL相关的配置:
# SASL认证配置
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# ZooKeeper配置
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
zookeeper.set.acl=true
zookeeper.sasl.client=false
您还需要为代理创建一个Kafka超级用户,并将其添加到授权列表中,以便针对代理进行SASL身份验证。
例如,在您的Kafka归档文件中,您可以使用以下命令基于用户名和密码创建一个Kafka超级用户:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=<您的密码>]]' --entity-type users --entity-name <您的用户名>
然后,您可以使用以下命令将用户作为超级用户添加到ACL列表中:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:<您的用户名> --operation ALL --cluster
完成这些步骤后,您的Kafka代理应该可以恢复与ZooKeeper的连接,您的应用程序也应该能够顺利与Kafka代理进行通信。