You need to enable JavaScript to run this app.
导航

通过 RocketMQ 消费 Canal Proto 格式的订阅数据

最近更新时间2023.10.08 11:45:20

首次发布时间2023.09.01 10:59:59

数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费 Canal Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Canal Proto 格式的数据。

前提条件

运行环境说明

Go 语言

安装 Go,需使用 Go 1.13 或以上版本。

说明

您可以执行 go -version 查看 Go 的版本。

Java 语言

  1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java --version 查看 Java 版本。

  2. 安装 Maven,需使用 Maven 3.8 或以上版本。

  3. maven pom.xml 文件中添加以下依赖。

    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.16.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.9.4</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-acl</artifactId>
       <version>4.9.4</version>
    </dependency>
    

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译:

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. ProtoBuf 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go

    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 canal.Canal 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java

      protoc -I=. --java_out=. volc.proto
      

关联 RocketMQ 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    ACCESSKEYRocketMQ 用户名。7ZrfJmOpVks5wvurbUgr****
    SECRETKEYRocketMQ 用户密码。q70s5SrCgsePZPh3fb0d****
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    NAMESRV_ADDR目标 DTS 数据订阅通道的私网地址。http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876

    说明

    关于以上表格的详细信息,请参见查看 RocketMQ 实例详情

  4. 按需选择以下 demo 示例。

    package main
    	
    	import (
    	   "context"
    	   "datasubscription/proto/canal"
    	   "fmt"
    	   "os"
    	   "strings"
    	   "time"
    	
    	   "github.com/apache/rocketmq-client-go/v2"
    	   "github.com/apache/rocketmq-client-go/v2/consumer"
    	   "github.com/apache/rocketmq-client-go/v2/primitive"
    	   "github.com/sirupsen/logrus"
    	   protobuf "google.golang.org/protobuf/proto"
    	)
    	
    	type Config struct {
    	   accessKey   string
    	   secretKey   string
    	   topic       string
    	   group       string
    	   namesrvAddr string
    	}
    	
    	var c Config
    	
    	func init() {
    	   c.namesrvAddr = os.Getenv("NAMESRV_ADDR")
    	   c.topic = os.Getenv("TOPIC")
    	   c.group = os.Getenv("GROUP")
    	   c.accessKey = os.Getenv("ACCESSKEY")
    	   c.secretKey = os.Getenv("SECRETKEY")
    	}
    	
    	func main() {
    	   logrus.Infof("config: %+v", c)
    	
    	   cli, _ := rocketmq.NewPushConsumer(
    	      consumer.WithGroupName(c.group),
    	      consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(c.namesrvAddr, ","))),
    	      consumer.WithConsumerModel(consumer.Clustering),
    	      consumer.WithConsumerOrder(true),
    	      consumer.WithCredentials(primitive.Credentials{
    	         AccessKey: c.accessKey,
    	         SecretKey: c.secretKey,
    	      }),
    	   )
    	
    	   err := cli.Subscribe(c.topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
    	      for _, msg := range msgs {
    	         handleCanalMsg(msg)
    	      }
    	      return consumer.ConsumeSuccess, nil
    	   })
    	   if err != nil {
    	      fmt.Println(err.Error())
    	   }
    	
    	   err = cli.Start()
    	   if err != nil {
    	      fmt.Println(err.Error())
    	      os.Exit(-1)
    	   }
    	
    	   time.Sleep(time.Hour)
    	   cli.Shutdown()
    	}
    	
    	func handleCanalMsg(msg *primitive.MessageExt) {
    	   entry := &canal.Entry{}
    	   if err := protobuf.Unmarshal(msg.Body, entry); err != nil {
    	      panic(err)
    	   }
    	
    	   logrus.WithField("EntryType", entry.GetEntryType().String()).Info("get message")
    	   if entry.GetEntryType() != canal.EntryType_TRANSACTIONBEGIN &&
    	      entry.GetEntryType() != canal.EntryType_TRANSACTIONEND {
    	      rowChange := &canal.RowChange{}
    	      if err := protobuf.Unmarshal(entry.GetStoreValue(), rowChange); err != nil {
    	         panic(err)
    	      }
    	
    	      if rowChange.GetIsDdl() {
    	         logrus.Infof("ddl: %s", rowChange.GetSql())
    	      } else {
    	         for _, row := range rowChange.GetRowDatas() {
    	            var before, after []string
    	            for _, col := range row.BeforeColumns {
    	               before = append(before, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
    	            }
    	            for _, col := range row.AfterColumns {
    	               after = append(after, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
    	            }
    	            logrus.WithField("after", after).WithField("before", before).Info("get row")
    	         }
    	      }
    	
    	      logrus.WithField("queueId", msg.Queue.QueueId).WithField("key", msg.GetKeys()).Info("fetch message")
    	   }
    	}
    	

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出:

    INFO[0009] get message                                   EntryType=ROWDATA
    		INFO[0009] ddl: CREATE TABLE demo (id_t INT) 
    		INFO[0009] fetch message                                 key=test.demo queueId=1
    	
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    INFO[0101] get message                                   EntryType=ROWDATA
    	INFO[0101] get row                                       after="[id_t[1(int)]]" before="[]"
    	INFO[0101] fetch message                                 key=test.demo queueId=1