You need to enable JavaScript to run this app.
数据库传输服务

数据库传输服务

复制全文
数据订阅最佳实践
通过 RocketMQ 消费火山引擎 Proto 格式的订阅数据
复制全文
通过 RocketMQ 消费火山引擎 Proto 格式的订阅数据

数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Canal 格式的数据。

前提条件

运行环境说明

Go 语言

安装 Go,需使用 Go 1.13 或以上版本。

说明

您可以执行 go -version 查看 Go 的版本。

Java 语言

  1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java --version 查看 Java 版本。

  2. 安装 Maven,需使用 Maven 3.8 或以上版本。

  3. maven pom.xml 文件中添加以下依赖。

    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.16.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>4.9.4</version>
    </dependency>
    <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-acl</artifactId>
       <version>4.9.4</version>
    </dependency>
    

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译:

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. ProtoBuf 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go。如果您使用的是已编译好的文件,那么您需要执行以下命令安装依赖。
      go get google.golang.org/protobuf @v1.27.1
      
    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 volc.Volc 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java
      protoc -I=. --java_out=. volc.proto
      

关联 RocketMQ 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    ACCESSKEYRocketMQ 用户名。7ZrfJmOpVks5wvurbUgr****
    SECRETKEYRocketMQ 用户密码。q70s5SrCgsePZPh3fb0d****
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    NAMESRV_ADDR目标 DTS 数据订阅通道的私网地址。http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876

    说明

    关于以上表格的详细信息,请参见查看 RocketMQ 实例详情

  4. 按需选择以下 demo 示例。

package main

import (
	"context"
	"github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/canal"
	"fmt"
	"os"
	"strings"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/sirupsen/logrus"
	protobuf "google.golang.org/protobuf/proto"
)

type Config struct {
	accessKey   string
	secretKey   string
	topic       string
	group       string
	namesrvAddr string
}

var c Config

func init() {
	c.namesrvAddr = os.Getenv("NAMESRV_ADDR")
	c.topic = os.Getenv("TOPIC")
	c.group = os.Getenv("GROUP")
	c.accessKey = os.Getenv("ACCESSKEY")
	c.secretKey = os.Getenv("SECRETKEY")
}

func main() {
	logrus.Infof("config: %+v", c)

	cli, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(c.group),
		consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(c.namesrvAddr, ","))),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithConsumerOrder(true),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: c.accessKey,
			SecretKey: c.secretKey,
		}),
	)

	err := cli.Subscribe(c.topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			entry := &proto.Entry{}
			if err := protobuf.Unmarshal(msg.Body, entry); err != nil {
				panic(err)
			}

			logrus.WithField("EntryType", entry.EntryType.String()).Info("get message")
			switch entry.GetEntryType() {
			case proto.EntryType_DDL:
				event := entry.GetDdlEvent()
				logrus.Infof("ddl: %s", event.GetSql())
			case proto.EntryType_DML:
				event := entry.GetDmlEvent()
				cols := event.ColumnDefs
				for _, row := range event.Rows {
					var before, after []string
					for i, col := range row.BeforeCols {
						before = append(before, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue()))
					}
					for i, col := range row.AfterCols {
						after = append(after, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue()))
					}
					logrus.WithField("after", after).WithField("before", before).Info("get row")
				}
			}

			logrus.WithField("queueId", msg.Queue.QueueId).WithField("key", msg.GetKeys()).Info("fetch message")
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}

	err = cli.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	time.Sleep(time.Hour)
	cli.Shutdown()
}		

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。
    CREATE TABLE demo (id_t INT);
    
    demo 表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:
INFO[0010] get message                                   EntryType=DDL
INFO[0010] ddl: CREATE TABLE demo (id_t INT)
INFO[0010] fetch message                                 key="dflow.demo " queueId=8
  1. 执行以下命令,在 demo 表中插入一条数据:
    INSERT INTO demo (id_t) VALUES (1);
    
    demo 表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:
INFO[0073] get message                                   EntryType=DML
INFO[0073] get row                                       after="[id_t[&{1}]]" before="[]"
INFO[0073] fetch message                                 key="dflow.demo " queueId=8
最近更新时间:2025.07.29 20:46:09
这个页面对您有帮助吗?
有用
有用
无用
无用