You need to enable JavaScript to run this app.
导航
普通消息
最近更新时间:2025.05.09 10:28:39首次发布时间:2025.05.09 10:28:39
我的收藏
有用
有用
无用
无用

火山引擎消息队列 RocketMQ 5.x版本提供同步发送、异步发送两种方式来发送普通消息。本文介绍如何通过不同方式发送普通消息。发送普通消息前请在控制台创建普通消息类型topic

同步发送消息

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。一般用于较为重要的消息发送场景。
同步发送方式发送普通消息的示例代码如下

package main

import (
        "context"
        "fmt"
        "log"
        "os"
        "strconv"
        "time"

        rmq_client "github.com/apache/rocketmq-clients/golang/v5"
        "github.com/apache/rocketmq-clients/golang/v5/credentials"
)

/*
     * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
     * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
    */
const (
        Topic     = "xxxx"
        Endpoint  = "rocketmq-xxxx:8080"
        AccessKey = "xxxx"
        SecretKey = "xxxx"
)

func main() {
        os.Setenv("mq.consoleAppender.enabled", "true")
        rmq_client.ResetLogger()
        // In most case, you don't need to create many producers, singleton pattern is more recommended.
        producer, err := rmq_client.NewProducer(&rmq_client.Config{
                Endpoint: Endpoint,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
                        AccessSecret: SecretKey,
                },
        },
                rmq_client.WithTopics(Topic),
        )
        if err != nil {
                log.Fatal(err)
        }
        // start producer
        err = producer.Start()
        if err != nil {
                log.Fatal(err)
        }
        // graceful stop producer
        defer producer.GracefulStop()

        for i := 0; i < 10; i++ {
                // new a message
                msg := &rmq_client.Message{
                        Topic: Topic,
                        Body:  []byte("this is a message : " + strconv.Itoa(i)),
                }
                // set keys and tag
                msg.SetKeys("a", "b")
                msg.SetTag("ab")
                // send message in sync
                resp, err := producer.Send(context.TODO(), msg)
                if err != nil {
                        log.Fatal(err)
                }
                for i := 0; i < len(resp); i++ {
                        fmt.Printf("%#v\n", resp[i])
                }
                // wait a moment
                time.Sleep(time.Millisecond * 1)
        }
}

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。异步发送可以避免线程阻塞,允许程序继续执行其他任务,从而提高系统的吞吐量和性能。
异步发送方式发送普通消息的示例代码如下。

package main

import (
        "context"
        "fmt"
        "log"
        "os"
        "strconv"
        "time"

        rmq_client "github.com/apache/rocketmq-clients/golang/v5"
        "github.com/apache/rocketmq-clients/golang/v5/credentials"
)

/*
     * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
     * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
    */
const (
        Topic     = "xxxx"
        Endpoint  = "rocketmq-xxxx:8080"
        AccessKey = "xxxx"
        SecretKey = "xxxx"
)

func main() {
        // log to console
        os.Setenv("mq.consoleAppender.enabled", "true")
        rmq_client.ResetLogger()
        // In most case, you don't need to create many producers, singleton pattern is more recommended.
        producer, err := rmq_client.NewProducer(&rmq_client.Config{
                Endpoint: Endpoint,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
                        AccessSecret: SecretKey,
                },
        },
                rmq_client.WithTopics(Topic),
        )
        if err != nil {
                log.Fatal(err)
        }
        // start producer
        err = producer.Start()
        if err != nil {
                log.Fatal(err)
        }
        // graceful stop producer
        defer producer.GracefulStop()
        for i := 0; i < 10; i++ {
                // new a message
                msg := &rmq_client.Message{
                        Topic: Topic,
                        Body:  []byte("this is a message : " + strconv.Itoa(i)),
                }
                // set keys and tag
                msg.SetKeys("a", "b")
                msg.SetTag("ab")
                // send message in async
                producer.SendAsync(context.TODO(), msg, func(ctx context.Context, resp []*rmq_client.SendReceipt, err error) {
                        if err != nil {
                                log.Fatal(err)
                        }
                        for i := 0; i < len(resp); i++ {
                                fmt.Printf("%#v\n", resp[i])
                        }
                })
                // wait a moment
                time.Sleep(time.Second * 1)
        }
}

订阅普通消息

5.x RocketMQ支持两种消费模式,分别为 Push Consumer 和 Simple Consumer。前者为服务端推送消息,后者为主动拉取消息,GO 5.x SDK仅支持Simple Consumer
Simple Consumer消费示例代码如下:

package main

import (
        "context"
        "fmt"
        "log"
        "os"
        "time"

        rmq_client "github.com/apache/rocketmq-clients/golang/v5"
        "github.com/apache/rocketmq-clients/golang/v5/credentials"
)

/*
     * 设置为您从火山引擎消息队列 RocketMQ版控制台获取的接入点信息,类似“http://{INSTANCE_ID}.rocketmq.ivolces.com.com:8080”。
     * 设置RocketMQ实例的AccessKey ID和AccessKey Secret。
    */
const (
        Topic     = "xxxx"
        ConsumerGroup = "xxxx"
        Endpoint  = "rocketmq-xxxx:8080"
        AccessKey = "xxxx"
        SecretKey = "xxxx"
)

var (
        // maximum waiting time for receive func
        awaitDuration = time.Second * 5
        // maximum number of messages received at one time
        maxMessageNum int32 = 16
        // invisibleDuration should > 20s
        invisibleDuration = time.Second * 20
        // receive messages in a loop
)

func main() {
        // log to console
        os.Setenv("mq.consoleAppender.enabled", "true")
        rmq_client.ResetLogger()
        // In most case, you don't need to create many consumers, singleton pattern is more recommended.
        simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
                Endpoint:      Endpoint,
                ConsumerGroup: ConsumerGroup,
                Credentials: &credentials.SessionCredentials{
                        AccessKey:    AccessKey,
                        AccessSecret: SecretKey,
                },
        },
                rmq_client.WithAwaitDuration(awaitDuration),
                rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
                        Topic: rmq_client.SUB_ALL,
                }),
        )
        if err != nil {
                log.Fatal(err)
        }
        // start simpleConsumer
        err = simpleConsumer.Start()
        if err != nil {
                log.Fatal(err)
        }
                
        // graceful stop simpleConsumer
        defer func() {
                if r := recover(); r != nil {
                        fmt.Println(r)
                }
                _ = simpleConsumer.GracefulStop()
        }()

        /*
                using multiple goroutines to receive message can avoid one brokerGroup which has no messages
                but blocking consumer receives message from other brokerGroup, which can help reducing message lag
        */
        ch := make(chan struct{})
        wg := &sync.WaitGroup{}
        for i := 0; i < receiveConcurrency; i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
                        for {
                                select {
                                case <-ch:
                                        return
                                default:
                                        mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
                                        if err != nil {
                                                //fmt.Println("receive message error: " + err.Error())
                                        }
                                        // ack message
                                        for _, mv := range mvs {
                                                fmt.Println(mv)
                                                if err := simpleConsumer.Ack(context.TODO(), mv); err != nil {
                                                        //fmt.Println("ack message error: " + err.Error())
                                                }
                                        }
                                }
                        }
                }()
        }

        exit := make(chan os.Signal, 1)
        signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM)
        // wait for exit signal
        <-exit
        close(ch)
        wg.Wait()
}