本文提供使用 Java SDK 收发事务消息的示例代码供您参考。
发送事务消息需要在控制台申请事务类型的topic,在发送half消息后,用户可以根据自身业务逻辑对消息选择:
另外事务消息需要自定义checker,以便于服务端收到消息确认后进行回查
public class RocketMQTransactionProducer { private static final Logger log = LoggerFactory.getLogger(RocketMQTransactionProducer.class); private RocketMQTransactionProducer() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); // Define your message body. String topic = "topic-transaction"; // 控制台申请的事务类型topic final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) // 定义checker .setTransactionChecker(messageView -> { log.info("Receive transactional message check, message={}", messageView); // Return the transaction resolution according to your business logic. return TransactionResolution.COMMIT; }) .build(); final Transaction transaction = producer.beginTransaction(); byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "*"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-1ff69ada8e0e") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message, transaction); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // Commit the transaction. transaction.commit(); // Or rollback the transaction. // transaction.rollback(); // Close the producer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. producer.close(); } }
事务消息的订阅方式与普通消息一致,示例代码如下所示。
public class RocketMQTransactionConsumer { private static final Logger log = LoggerFactory.getLogger(RocketMQTransactionConsumer.class); private RocketMQTransactionConsumer() { } public static void main(String[] args) throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); /* * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。 * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。 */ String endpoints = "rocketmq-xxx.rocketmq.ivolces.com:8080"; String accessKey = "xxxx"; String secretKey = "xxxx"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // 暂时不支持ssl模式 .enableSsl(false) .setCredentialProvider(sessionCredentialsProvider) .build(); String tag = "yourMessageTagA"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String consumerGroup = "yourConsumerGroup"; String topic = "yourTopic" // In most case, you don't need to create too many consumers, singleton pattern is recommended. PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // Handle the received message and return consume result. log.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // Block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // Close the push consumer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. pushConsumer.close(); } }