You need to enable JavaScript to run this app.
导航

通过 Kafka 消费 Canal Proto 格式的订阅数据

最近更新时间2024.04.25 10:08:17

首次发布时间2023.08.29 09:48:43

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费 Canal Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal Proto 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。
    2. 依次执行以下命令,安装 pip 依赖。
      pip install kafka-python
      
      pip install protobuf
      
      pip install python-snappy
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。
    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。
    3. maven pom.xml 文件中添加以下依赖,本示例以 Kafka 2.2.2 版本为例 。
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译。

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. 将下载的 canal.proto 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go
    • Python 语言
      当您的语言是 Python 时,请执行以下命令将 ProtoBuf 文件编译成 Python 语言的代码,获取 canal_pb2 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Python
      protoc -I=. --python_out=. canal.proto
      
    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 canal.Canal 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java
      protoc -I=. --java_out=. canal.proto
      

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@Pwd
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.kafka.ivolces.com:9092
  4. 请按需选择以下 demo 示例。

package main

import (
   "context"
   "fmt"
   "log"
   "os"
   "strings"
   "sync"

   "github.com/Shopify/sarama"
   "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/canal"
   protobuf "google.golang.org/protobuf/proto"
)

type Handler struct {
   topic          string
   partitionCount map[int32]int
   totalCount     int
   mu             sync.Mutex
}

type Config struct {
   username string
   password string
   topic    string
   group    string
   addr     string
}

var (
   c Config
)

func init() {
   c.addr = os.Getenv("BROKERS")
   c.topic = os.Getenv("TOPIC")
   c.group = os.Getenv("GROUP")
   c.username = os.Getenv("USER")
   c.password = os.Getenv("PASSWORD")
}

func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
   fmt.Println("setup")
   return nil
}

func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
   fmt.Println("clean up")
   return nil
}

func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
   fmt.Println("ConsumeClaim")
   for m := range claim.Messages() {
      h.handleCanalMsg(m)
      session.MarkMessage(m, "")
      session.Commit()
   }
   return nil
}
func (h *Handler) handleCanalMsg(msg *sarama.ConsumerMessage) {
   h.mu.Lock()
   defer h.mu.Unlock()
   h.totalCount++
   h.partitionCount[msg.Partition]++

   entry := &canal.Entry{}
   if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
      panic(err)
   }

   fmt.Println("-------------- handle message --------------")
   fmt.Printf("ServerID:%v\n", entry.GetHeader().GetServerId())
   fmt.Printf("Timestamp:%v\n", entry.GetHeader().GetExecuteTime())
   fmt.Printf("Database:%v\n", entry.GetHeader().GetSchemaName())
   fmt.Printf("Table:%v\n", entry.GetHeader().GetTableName())
   if entry.GetEntryType() != canal.EntryType_TRANSACTIONBEGIN &&
      entry.GetEntryType() != canal.EntryType_TRANSACTIONEND {
      rowChange := &canal.RowChange{}
      if err := protobuf.Unmarshal(entry.GetStoreValue(), rowChange); err != nil {
         panic(err)
      }

      fmt.Printf("EventType:%v\n", rowChange.GetEventType().String())
      if rowChange.GetIsDdl() {
         fmt.Printf("DDL:%s\n", rowChange.GetSql())
      } else {
         for _, row := range rowChange.GetRowDatas() {
            var before, after []string
            for _, col := range row.BeforeColumns {
               before = append(before, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
            }
            for _, col := range row.AfterColumns {
               after = append(after, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType()))
            }
            fmt.Printf("RowDatas:before=%v after=%v\n", before, after)
         }
      }

   }
}

func main() {
   fmt.Printf("config: %+v", c)
   config := sarama.NewConfig()
   config.Net.SASL.User = c.username
   config.Net.SASL.Password = c.password
   config.Net.SASL.Enable = true
   config.Consumer.Offsets.Initial = sarama.OffsetNewest
   config.Version = sarama.V2_2_2_0
   topic := c.topic
   group := c.group
   addr := strings.Split(c.addr, ",")
   cons, err := sarama.NewConsumerGroup(addr, group, config)
   if err != nil {
      panic(err)
   }
   defer cons.Close()
   handler := &Handler{
      topic:          topic,
      partitionCount: make(map[int32]int),
   }
   for {
      err = cons.Consume(context.Background(), []string{handler.topic}, handler)
      if err != nil {
         log.Fatalln(err)
      }
   }
}

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出如下所示:

    sourceType:MYSQL
    	entryType:ROWDATA
    	executeTime:1692793679
    	schemaName:test
    	tableName:demo
    	EventType:CREATE
    	DDL:CREATE TABLE demo (id_t INT)
    	
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    sourceType:MYSQL
    	entryType:ROWDATA
    	executeTime:1692793748
    	schemaName:test
    	tableName:demo
    	EventType:INSERT
    	RowDatas:before=[] after=[id_t[1(int)]]