You need to enable JavaScript to run this app.
导航

事务消息

最近更新时间2023.04.21 14:05:14

首次发布时间2022.01.28 15:16:40

本文提供使用 Java SDK 收发事务消息的示例代码供您参考。

前提条件

您已完成准备工作

发送事务消息

消息发送流程

发送事务消息包含以下两个步骤:

  1. 发送半事务消息(Half Message)及执行本地事务。
    示例代码如下。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQTransactionProducer {

    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换RocketMQ实例的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }

    public static void main(String[] args) throws MQClientException {
        /**
        * 创建事务消息Producer
        * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.volces.com.com:9876”
        */
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
        
        transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
        //如果 SSL 认证策略设置为仅SSL连接,则必须设置setUseTLS(true)。
        //transactionMQProducer.setUseTLS(true)
        transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
        transactionMQProducer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message message = new Message("YOUR TRANSACTION TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                    @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                        System.out.println("开始执行本地事务: " + msg);
                        return LocalTransactionState.UNKNOW;
                    }
                }, null);
                assert sendResult != null;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 提交事务消息状态。
    当业务代码中的处理事务完成之后,需要通知 RocketMQ 服务端执行处理事务的结果。以便确认消息队列中的消息是否需要回滚或者提交。
    • SDK 中已经包含了这部分逻辑,只需要在回调中增加业务相关的查询逻辑即可。
    • 回调会多次执行,在本地事务执行后会调用回调,如果这次回调没有明确的提交状态,则 RocketMQ 服务端后续会多次重试,触发回调确认事务的执行状态。
      返回的事务状态包括以下三种:
    • LocalTransactionState.COMMIT_MESSAGE:提交事务。事务提交之后,之前暂存的消息对消费者可见。
    • LocalTransactionState.ROLLBACK_MESSAGE:回滚事务。事务回滚,之前暂存的消息会被标记为已经回滚。
    • LocalTransactionState.UNKNOW:无法判断事务状态。业务逻辑暂时无法判断是否需要提交之前暂存的消息状态,希望 RocketMQ 服务端可以稍后触发回调的逻辑进行重试。
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt;

/**
* RocketMQ发送事务消息本地Check接口实现类
*/
public class LocalTransactionCheckerImpl implements TransactionCheckListener {
    /**
    * 本地事务Checker
    */
    @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

事务回查流程

事务回查的整体流程使用两阶段提交的方式。

  1. 首先使用 SDK 发送半事务消息到 RocketMQ 中。
    这时消息临时保存在 RocketMQ 中,后续需要执行本地事务逻辑。当回调函数返回了 LocalTransactionState.UNKNOW ,或者业务逻辑中的事务触发了回滚逻辑时,需要将事务结果告知 RocketMQ 服务端。因此服务端需要知道之前投递的半事务消息是否提交或者回滚,定期触发事务结果回调函数来获取这部分的状态信息。
  2. 当 RocketMQ 服务端触发事务回查回调函数时,需要通过消息中保存的信 息来查询和业务逻辑处理事务状态相关的信息,并根据查询事务状态返回对应的事务消息状态信息。

订阅事务消息

事务消息的订阅方式与普通消息一致,示例代码如下所示。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    //设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID,以及替换为RocketMQ实例的AccessKey ID和AccessKey Secret。
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }
    public static void main(String[] args) throws MQClientException {
        // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的GID, 以及替换为您RocketMQ实例的AccessKey ID和AccessKey Secret。
        // 设置为火山引擎消息队列 RocketMQ版实例的接入点。
        // 设置为您在火山引擎消息队列 RocketMQ版控制台上创建的Topic。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("http://{INSTANCE_ID}.rocketmq.volces.com.com:9876");
        consumer.subscribe("YOUR TOPIC", "*");
        //如果 SSL 认证策略设置为仅SSL连接,则必须设置setUseTLS(true)
        //consumer.setUseTLS(true);
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}