You need to enable JavaScript to run this app.
数据库传输服务

数据库传输服务

复制全文
数据订阅最佳实践
通过 Kafka 消费 Canal Proto 格式的订阅数据
复制全文
通过 Kafka 消费 Canal Proto 格式的订阅数据

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费 Canal Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal Proto 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。
    2. 依次执行以下命令,安装 pip 依赖。
      pip install kafka-python
      
      pip install protobuf
      
      pip install python-snappy
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。
    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。
    3. maven pom.xml 文件中添加以下依赖,本示例以 Kafka 2.2.2 版本为例 。
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译。

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. 将下载的 canal.proto 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go
    • Python 语言
      当您的语言是 Python 时,请执行以下命令将 ProtoBuf 文件编译成 Python 语言的代码,获取 canal_pb2 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Python
      protoc -I=. --python_out=. canal.proto
      
    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 canal.Canal 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java
      protoc -I=. --java_out=. canal.proto
      

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@Pwd
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.kafka.ivolces.com:9092
  4. 请按需选择以下 demo 示例。

package main

import (
   "context"
   "fmt"
   "log"
   "os"
   "strings"
   "sync"

   "github.com/Shopify/sarama"
   "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/canal"
   protobuf "google.golang.org/protobuf/proto"
)

type Handler struct {
   topic          string
   partitionCount map[int32]int
   totalCount     int
   mu             sync.Mutex
}

type Config struct {
   username string
   password string
   topic    string
   group    string
   addr     string
}

var (
   c Config
)

func init() {
   c.addr = os.Getenv("BROKERS")
   c.topic = os.Getenv("TOPIC")
   c.group = os.Getenv("GROUP")
   c.username = os.Getenv("USER")
   c.password = os.Getenv("PASSWORD")
}

func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
   fmt.Println("setup")
   return nil
}

func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
   fmt.Println("clean up")
   return nil
}

func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   fmt.Println("ConsumeClaim")
   for m := range claim.Messages() {
      h.handleCanalMsg(m)
      session.MarkMessage(m, "")
      session.Commit()
   }
   return nil
}
func (h *Handler) handleCanalMsg(msg *sarama.ConsumerMessage) {
   h.mu.Lock()
   defer h.mu.Unlock()
   h.totalCount++
   h.partitionCount[msg.Partition]++

   entry := &canal.Entry{}
   if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
      panic(err)
   }

   fmt.Println("-------------- handle message --------------")
   fmt.Printf("ServerID:%v\n", entry.GetHeader().GetServerId())
   fmt.Printf("Timestamp:%v\n", entry.GetHeader().GetExecuteTime())
   fmt.Printf("Database:%v\n", entry.GetHeader().GetSchemaName())
   fmt.Printf("Table:%v\n", entry.GetHeader().GetTableName())
   if entry.GetEntryType() != canal.EntryType_TRANSACTIONBEGIN &&
      entry.GetEntryType() != canal.EntryType_TRANSACTIONEND {
      rowChange := &canal.RowChange{}
      if err := protobuf.Unmarshal(entry.GetStoreValue(), rowChange); err != nil {
         panic(err)
      }

      fmt.Printf("EventType:%v\n", rowChange.GetEventType().String())
      if rowChange.GetIsDdl() {
         fmt.Printf("DDL:%s\n", rowChange.GetSql())
      } else {
         for _, row := range rowChange.GetRowDatas() {
            var before, after []string
            for _, col := range row.BeforeColumns {
               before = append(before, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
            }
            for _, col := range row.AfterColumns {
               after = append(after, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
            }
            fmt.Printf("RowDatas:before=%v after=%v\n", before, after)
         }
      }

   }
}

func main() {
   fmt.Printf("config: %+v", c)
   config := sarama.NewConfig()
   config.Net.SASL.User = c.username
   config.Net.SASL.Password = c.password
   config.Net.SASL.Enable = true
   config.Consumer.Offsets.Initial = sarama.OffsetNewest
   config.Version = sarama.V2_2_2_0
   topic := c.topic
   group := c.group
   addr := strings.Split(c.addr, ",")
   cons, err := sarama.NewConsumerGroup(addr, group, config)
   if err != nil {
      panic(err)
   }
   defer cons.Close()
   handler := &Handler{
      topic:          topic,
      partitionCount: make(map[int32]int),
   }
   for {
      err = cons.Consume(context.Background(), []string{handler.topic}, handler)
      if err != nil {
         log.Fatalln(err)
      }
   }
}

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出如下所示:

    sourceType:MYSQL
    	entryType:ROWDATA
    	executeTime:1692793679
    	schemaName:test
    	tableName:demo
    	EventType:CREATE
    	DDL:CREATE TABLE demo (id_t INT)
    	
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    sourceType:MYSQL
    	entryType:ROWDATA
    	executeTime:1692793748
    	schemaName:test
    	tableName:demo
    	EventType:INSERT
    	RowDatas:before=[] after=[id_t[1(int)]]
最近更新时间:2025.07.29 20:46:09
这个页面对您有帮助吗?
有用
有用
无用
无用