You need to enable JavaScript to run this app.
导航

通过 RocketMQ 消费火山引擎 Proto 格式的订阅数据

最近更新时间2023.10.08 11:45:19

首次发布时间2022.09.26 11:27:21

数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Canal 格式的数据。

前提条件

运行环境说明

Go 语言

安装 Go,需使用 Go 1.13 或以上版本。

说明

您可以执行 go -version 查看 Go 的版本。

Java 语言

  1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java --version 查看 Java 版本。

  2. 安装 Maven,需使用 Maven 3.8 或以上版本。

  3. maven pom.xml 文件中添加以下依赖。

    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.16.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.9.4</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-acl</artifactId>
       <version>4.9.4</version>
    </dependency>
    

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译:

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. ProtoBuf 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go

    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 volc.Volc 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java

      protoc -I=. --java_out=. volc.proto
      

关联 RocketMQ 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    ACCESSKEYRocketMQ 用户名。7ZrfJmOpVks5wvurbUgr****
    SECRETKEYRocketMQ 用户密码。q70s5SrCgsePZPh3fb0d****
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    NAMESRV_ADDR目标 DTS 数据订阅通道的私网地址。http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876

    说明

    关于以上表格的详细信息,请参见查看 RocketMQ 实例详情

  4. 按需选择以下 demo 示例。

package main

import (
	"context"
	"datasubscription/proto"
	"fmt"
	"os"
	"strings"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/sirupsen/logrus"
	protobuf "google.golang.org/protobuf/proto"
)

type Config struct {
	accessKey   string
	secretKey   string
	topic       string
	group       string
	namesrvAddr string
}

var c Config

func init() {
	c.namesrvAddr = os.Getenv("NAMESRV_ADDR")
	c.topic = os.Getenv("TOPIC")
	c.group = os.Getenv("GROUP")
	c.accessKey = os.Getenv("ACCESSKEY")
	c.secretKey = os.Getenv("SECRETKEY")
}

func main() {
	logrus.Infof("config: %+v", c)

	cli, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(c.group),
		consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(c.namesrvAddr, ","))),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithConsumerOrder(true),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: c.accessKey,
			SecretKey: c.secretKey,
		}),
	)

	err := cli.Subscribe(c.topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			entry := &proto.Entry{}
			if err := protobuf.Unmarshal(msg.Body, entry); err != nil {
				panic(err)
			}

			logrus.WithField("EntryType", entry.EntryType.String()).Info("get message")
			switch entry.GetEntryType() {
			case proto.EntryType_DDL:
				event := entry.GetDdlEvent()
				logrus.Infof("ddl: %s", event.GetSql())
			case proto.EntryType_DML:
				event := entry.GetDmlEvent()
				cols := event.ColumnDefs
				for _, row := range event.Rows {
					var before, after []string
					for i, col := range row.BeforeCols {
						before = append(before, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue()))
					}
					for i, col := range row.AfterCols {
						after = append(after, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue()))
					}
					logrus.WithField("after", after).WithField("before", before).Info("get row")
				}
			}

			logrus.WithField("queueId", msg.Queue.QueueId).WithField("key", msg.GetKeys()).Info("fetch message")
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}

	err = cli.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	time.Sleep(time.Hour)
	cli.Shutdown()
}		

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。
    CREATE TABLE demo (id_t INT);
    
    demo 表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:
INFO[0010] get message                                   EntryType=DDL
INFO[0010] ddl: CREATE TABLE demo (id_t INT)
INFO[0010] fetch message                                 key="dflow.demo " queueId=8
  1. 执行以下命令,在 demo 表中插入一条数据:
    INSERT INTO demo (id_t) VALUES (1);
    
    demo 表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:
INFO[0073] get message                                   EntryType=DML
INFO[0073] get row                                       after="[id_t[&{1}]]" before="[]"
INFO[0073] fetch message                                 key="dflow.demo " queueId=8