You need to enable JavaScript to run this app.
导航

Python SDK

最近更新时间2024.04.28 10:53:15

首次发布时间2024.03.27 14:56:19

本文介绍如何通过 Python SDK 接入云原生消息引擎 BMQ 并收发消息。

前提条件

创建资源实例,并获取接入点地址,请参见管理资源池

安装依赖

pip install kafka-python

设置Debug日志

import logging
import sys
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)

发送消息

创建并编写producer.py发送消息。

PLAINTEXT

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers='your broker list',
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见获取(重置)用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见获取(重置)用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

消费消息

创建并编写consumer.py发送消息。

PLAINTEXT

from kafka import KafkaConsumer
consumer = KafkaConsumer('your topic', bootstrap_servers="your broker list", group_id="your consumer group",api_version=(0, 10, 2))
for msg in consumer:
    print(msg)

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见获取(重置)用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见获取(重置)用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)