You need to enable JavaScript to run this app.
导航

顺序消息

最近更新时间2023.10.31 11:56:19

首次发布时间2022.01.28 15:16:40

消息队列 RocketMQ版提供顺序消息(FIFO消息)供您使用。在顺序消息模型中,您需要严格按照顺序来发布和消费消息。本文提供使用 Go SDK 收发顺序消息的示例代码供您参考。

背景信息

顺序消息分为两类,全局顺序消息和分区顺序消息。区别仅为队列数量不同,代码没有区别。

  • 全局顺序:
    对于指定的一个 Topic,所有消息的生产和消费需要遵循一定的顺序,消息的消费顺序必须和生产顺序一致,即需要严格的先入先出 FIFO(First In First Out)的顺序进行发布和消费。
  • 分区顺序:
    对于指定的一个 Topic,其中每一个分区的消息生产与消费是有序的,同一个队列内的消息按照严格的 FIFO 顺序进行发布和订阅。消息投递到哪一个分区由消息的Sharding Key来进行区分。在 SDK 中可以通过指定 Sharding Key 和
    MessageQueueSelector 回调函数来控制消息投递到哪个分区。

前提条件

发送顺序消息

发送顺序消息的示例代码如下。

import (
   "context"
   "fmt"
   "os"
   "strconv"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/primitive"
   "github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
   // 这里按照业务逻辑来选择每个消息投递到哪个队列当中
   queueSelector := producer.NewHashQueueSelector()
   
   p, _ := rocketmq.NewProducer(
   // 配置RocketMQ实例的接入点。
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876"})),
      producer.WithRetry(2),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      producer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCCESSKEY",
          SecretKey: "SECRETKEY",
      }),
      // 配置使用的队列选择回调
      producer.WithQueueSelector(queueSelector),
   )
   err := p.Start()
   if err != nil {
      fmt.Printf("start producer error: %s", err.Error())
      os.Exit(1)
   }
   // 配置Topic名称。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%TopicID},例如 "MQ_INST_****%testTopic"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
   topic := "testTopic"

   for i := 0; i < 10; i++ {
      msg := &primitive.Message{
         Topic: topic,
         Body:  []byte("Hello World " + strconv.Itoa(i)),
      }
      
      // msg 指定分区key, 自定义即可
      msg.WithShardingKey("key" + strconv.ItoA(i))
      
      
      res, err := p.SendSync(context.Background(), msg)

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

订阅顺序消息

订阅顺序消息的示例代码如下。

说明

订阅顺序消息之前,需要在创建 RocketMQ Consumer 的时候配置 ConsumerOrdertrue

import (
   "context"
   "fmt"
   "os"
   "time"

   "github.com/apache/rocketmq-client-go/v2"
   "github.com/apache/rocketmq-client-go/v2/consumer"
   "github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
   c, _ := rocketmq.NewPushConsumer(
      //配置使用的Group ID。对于实例 ID 格式为 MQ_INST_xxxx 的实例,此处配置的格式为${实例ID%GroupID},例如 "MQ_INST_****%demo-group"。详细说明请参考https://www.volcengine.com/docs/6410/153010#注意事项。
      topic := "testTopic"
      consumer.WithGroupName("demo-group"),
      consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{
         "http://rocketmq-cnoea09856be****.rocketmq.volces.com:9876",
      })),
      // 配置RocketMQ实例的AccessKey ID和AccessKey Secret。
      consumer.WithCredentials(primitive.Credentials{
          AccessKey: "ACCESSKEY",
          SecretKey: "SECRETKEY",
      }),
      consumer.WithConsumerModel(consumer.Clustering),
      
      // 使用顺序消费的模式
      consumer.WithConsumerOrder(true),
   )
   err := c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
      msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
      orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
      fmt.Printf("orderlyCtx: %#v\n", orderlyCtx)
      fmt.Printf("msgs: %v \n", msgs)
      return consumer.ConsumeSuccess, nil
      
      // 如果消费失败或者消费异常
      // return consumer.SuspendCurrentQueueAMoment, err
   })
   if err != nil {
      fmt.Println(err.Error())
   }
   // Note: start after subscribe
   err = c.Start()
   if err != nil {
      fmt.Println(err.Error())
      os.Exit(-1)
   }
   time.Sleep(time.Hour)
   err = c.Shutdown()
   if err != nil {
      fmt.Printf("Shutdown Consumer error: %s", err.Error())
   }
}