You need to enable JavaScript to run this app.
文档中心
云原生消息引擎

云原生消息引擎

复制全文
下载 pdf
SDK 参考
Python SDK
复制全文
下载 pdf
Python SDK

本文介绍如何通过 Python SDK 接入云原生消息引擎 BMQ 并收发消息。

前提条件

  • 创建资源实例,并获取接入点地址,请参见管理资源池
  • 您需要提前在实例所属安全组中放开 9092 端口。具体操作,请参见添加安全组访问规则
  • (可选)您如果需要通过 SASL 用户名和密码进行鉴权,还需提前创建用户并获取密码。具体操作,请参见创建 SASL 用户

安装依赖

pip install kafka-python

设置Debug日志

import logging
import sys
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)

发送消息

创建并编写producer.py发送消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers='your broker list',
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    ssl_check_hostname=False,
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

消费消息

创建并编写consumer.py接收消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

from kafka import KafkaConsumer
consumer = KafkaConsumer('your topic', bootstrap_servers="your broker list", group_id="your consumer group",api_version=(0, 10, 2))
for msg in consumer:
    print(msg)

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    ssl_check_hostname=False,
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)
最近更新时间:2024.11.08 10:57:30
这个页面对您有帮助吗?
有用
有用
无用
无用