You need to enable JavaScript to run this app.
导航
SASL_SSL 接入点 SCRAM 机制收发消息
最近更新时间:2023.09.12 15:16:14首次发布时间:2023.02.27 14:15:50

本文以 Python 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_SSL 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。

前提条件

已完成准备工作。详细说明请参考准备工作

1 添加配置文件

创建消息队列 Kafka版配置文件 config.json。 通过 SASL 接入点 SCRAM 机制接入时,配置文件示例如下。配置文件字段的详细说明,请参考SDK 配置说明

说明

  • 请根据注释提示填写相关参数,并删除注释。
  • SCRAM 机制下,应使用具备对应 Topic 访问权限的 SCRAM 用户进行 SASL 认证。获取用户名及密码的方式请参考2 收集连接信息
{
  "bootstrap.servers": "xxxxx", // 修改配置为实例的SASL接入点
  "security.protocol": "SASL_SSL", // 固定为SASL_SSL
  "topic": "xxxx", // 修改配置为待发送的topic名称
  "consumer": {
    "group.id": "xxxx" // 修改为指定消费组的名称
  },
  "sasl": {
    "enabled": true, // 使用SASL接入点时,必须设置为true
    "mechanism": "SCRAM-SHA-256", // 用户类型为Scram时固定为SCRAM-SHA-256
    "username": "xxxx", // SCRAM用户名
    "password": "xxxx" // SCRAM用户密码
  }
}

2 发送消息

实现方法

  1. 创建消息发送程序 producer.py
  2. 编译并运行 producer.py 发送消息。
  3. 查看运行结果。
    运行结果示例如下。
    图片

说明

消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。

python3 {DemoPath}/bytedance_kafka.py

示例代码

通过 SASL_SSL 接入点 SCRAM 机制生产消息的示例代码如下,您也可以参考 Demo 中的 示例文件{DemoPath}/client/producer.py,实现相关业务逻辑。

from confluent_kafka import Producer


def callback(err, meta):
    """
    py:function:: callback(err, meta)

    Handle the result of message delivery.

    :param confluent_kafka.KafkaError err: error if delivery is failed
    :param confluent_kafka.Message meta: message metadata if delivery is success
    :return: None
    """
    if err is None:
        print('[INFO] Delivered message to topic {} [{}] at offset {}'
              .format(meta.topic(), meta.partition(), meta.offset()))
    else:
        print('[ERROR] Delivery failed: {}'.format(err))


def run_producer(conf):
    # create producer instance
    p = Producer(conf.producer_conf())

    for i in range(10):
        # send a message
        p.produce(conf.topic(), 'demon message {}'.format(i).encode('utf-8'), on_delivery=callback)
        # call poll method to handle deliver result
        p.poll(timeout=0)

    # flush all messages
    p.flush(timeout=5.0)

3 消费消息

实现方法

  1. 创建 Consumer 订阅消息程序 consumer.py
  2. 编译并运行 consumer.py消费消息。
  3. 查看运行结果。

示例代码

通过 SASL_SSL 接入点 SCRAM 机制消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.py,实现相关业务逻辑。

from confluent_kafka import Consumer


def handle_message(msg):
    """
    py:function:: handle_message(msg)

    Handle consumed message.

    :param confluent_kafka.Message msg: consumed message meta.
    :return: None
    """
    print('[INFO] Consumed a message from topic {} [{}] at offset {}. value: {}'.
          format(msg.topic(), msg.partition(), msg.offset(), msg.value().decode('utf-8')))


def run_consumer(conf):
    # create a consumer instance
    c = Consumer(conf.consumer_conf())
    # subscribe topic
    c.subscribe([conf.topic()])

    try:
        count = 0
        while count < 10:
            msg = c.poll(timeout=1.0)
            if msg is None:
                # consumer initial and group rebalance may take some time.
                continue
            if msg.error():
                print('[ERROR] Consume msg failed, {}'.format(msg.error()))
                continue
            # handle kafka messages. this
            handle_message(msg)
            count += 1

        c.commit(asynchronous=False)
    finally:
        c.close()