You need to enable JavaScript to run this app.
导航

事务消息

最近更新时间2023.10.31 11:56:20

首次发布时间2023.05.12 15:10:40

本文提供使用Python SDK收发事务消息的示例代码供您参考。

前提条件

发送事务消息

通过以下步骤发送事务消息。

  1. 业务侧通过 send_message_in_transaction 发送消息到 RocketMQ 服务端。
  2. 创建 TransactionMQProducer 时注册业务查询事务执行是否成功的接口 transaction_checker_callback。
  3. 发送事务消息时业务侧通过 transaction_local_execute 执行本地事务。

示例代码如下。

import time
from rocketmq.client import TransactionMQProducer, Message, TransactionStatus

producer_group = ""  # 生产者group
name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""   # RocketMQ实例密钥管理页面获取到的AccessKey Secret
key = ""  # 消息key
tag = ""  # 消息tag

def transaction_checker_callback(msg, user_args):
    return TransactionStatus.COMMIT

def transaction_local_execute(msg, user_args):
    return TransactionStatus.UNKNOWN

# 创建并启动生产者实例
producer = TransactionMQProducer(producer_group, transaction_checker_callback)
producer.set_name_server_address(name_server_addr)
producer.set_session_credentials(access_key, access_secret, "")
producer.start()

# 组装消息
msg = Message(topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_body("hello volcano engine")
ret = producer.send_message_in_transaction(msg, transaction_local_execute, None)

print(ret.status, ret.msg_id, ret.offset)

while True:
    time.sleep(3600)

订阅事务消息

事务消息的订阅方式与普通消息一致,示例代码如下所示。

集群模式消费

集群模式消费消息的示例代码如下。

import time

from rocketmq.client import PushConsumer, ConsumeStatus

name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
group = ""  # 在火山引擎控制台Group管理页面创建的group名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""  # RocketMQ实例密钥管理页面获取到的AccessKey Secret
tags = ""  # 消费消息匹配的tag,多个tag使用||进行分隔

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer(group)
consumer.set_name_server_address(name_server_addr)
consumer.set_session_credentials(access_key, access_secret, "")

consumer.subscribe(topic, callback, tags)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()

广播模式消费

广播模式消费的示例代码如下。

import time

from rocketmq.client import PushConsumer, ConsumeStatus
from rocketmq.ffi import MessageModel

name_server_addr = "http://rocketmq-xxxxxxxx.rocketmq.ivolces.com:9876"  # 火山引擎控制台展示的TCP接入点
topic = ""  # 在火山引擎控制台Topic管理页面创建的topic名称
group = ""  # 在火山引擎控制台Group管理页面创建的group名称
access_key = ""  # RocketMQ实例密钥管理页面获取到的AccessKey ID
access_secret = ""  # RocketMQ实例密钥管理页面获取到的AccessKey Secret
tags = ""  # 消费消息匹配的tag,多个tag使用||进行分隔

def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS

consumer = PushConsumer(group, message_model=MessageModel.BROADCASTING) # 指定消费模式为广播模式
consumer.set_name_server_address(name_server_addr)
consumer.set_session_credentials(access_key, access_secret, "")

consumer.subscribe(topic, callback, tags)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()