You need to enable JavaScript to run this app.
数据库传输服务

数据库传输服务

复制全文
数据订阅最佳实践
通过 Kafka 消费火山引擎 Proto 格式的订阅数据
复制全文
通过 Kafka 消费火山引擎 Proto 格式的订阅数据

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 已安装 protoc,建议使用 protoc 3.18 或以上版本。

    说明

    您可以执行 protoc -version 查看 protoc 版本。

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。

    2. 依次执行以下命令,安装 pip 依赖。

      pip install kafka-python
      
      pip install protobuf
      
      pip install python-snappy
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。

    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。

    3. 在 IDEA 软件,单击 Create New Project 创建一个 Project。

    4. 在新建的 Project 中的项目对象模型文件 pom.xml 中添加以下依赖,本示例以 Kafka 2.2.2 版本为例。同时,您也可以将 pom.xml 文件中 kafka-clients 的版本修改为其他版本 。

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      <dependency>
        <groupId>com.github.daniel-shuy</groupId>
        <artifactId>kafka-protobuf-serde</artifactId>
        <version>2.2.0</version>
      </dependency>
      <dependency>
        <groupId>org.xerial.snappy</groupId>
        <artifactId>snappy-java</artifactId>
        <version>1.1.8.4</version>
      </dependency>
      <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.22.2</version>
      </dependency>
      

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 按需选择 Java 消费示例Python 消费示例,Python 语言和 Java 语言各消费示例的目录如下所示:

    .
    	├── dts_kafka_consumer_demo.py    # 消费 Demo 文件
    	├── volc.proto                    # 火山引擎格式文件
    	└── volc_pb2.py                   # 编译 Volc.proto 后的生成的 Python 文件
    	  

    说明

    Go 语言中仅包含一个 Demo 文件。

  4. 按需修改 Demo 文件,具体代码如下所示。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@Pwd
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.kafka.ivolces.com:9092
    package main
    	
    	import (
    	   "context"
    	   "fmt"
    	   "log"
    	   "strings"
    	   "sync"
    	
    	   "github.com/Shopify/sarama"
    	   proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto"
    	   protobuf "google.golang.org/protobuf/proto"
    	)
    	
    	type Handler struct {
    	   topic          string
    	   partitionCount map[int32]int
    	   totalCount     int
    	   mu             sync.Mutex
    	}
    	
    	type Config struct {
    	   username string
    	   password string
    	   topic    string
    	   group    string
    	   brokers  string
    	}
    	
    	var (
    	   c Config
    	)
    	
    	func init() {
    	   c.brokers = "your brokers addrress"
    	   c.topic = "your topic"
    	   c.group = "your group"
    	   c.username = "your username"
    	   c.password = "your password"
    	}
    	
    	func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
    	   fmt.Println("setup")
    	   return nil
    	}
    	
    	func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
    	   fmt.Println("clean up")
    	   return nil
    	}
    	
    	func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    	   fmt.Println("ConsumeClaim")
    	   for m := range claim.Messages() {
    	      h.handleMsg(m)
    	      session.MarkMessage(m, "")
    	      session.Commit()
    	   }
    	   return nil
    	}
    	
    	func (h *Handler) handleMsg(msg *sarama.ConsumerMessage) {
    	   h.mu.Lock()
    	   defer h.mu.Unlock()
    	   h.totalCount++
    	   h.partitionCount[msg.Partition]++
    	   entry := &proto.Entry{}
    	   if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
    	      panic(err)
    	   }
    	   fmt.Println("-------------- handle message --------------")
    	   fmt.Printf("get message EventType:%v
    ", entry.EntryType.String())
    	   switch entry.GetEntryType() {
    	   case proto.EntryType_DDL:
    	      event := entry.GetDdlEvent()
    	      fmt.Printf("ddl %v
    ", event.Sql)
    	   case proto.EntryType_DML:
    	      event := entry.GetDmlEvent()
    	      cols := event.ColumnDefs
    	      for _, row := range event.Rows {
    	         var before, after []string
    	         for i, col := range row.BeforeCols {
    	            before = append(before, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
    	         }
    	         for i, col := range row.AfterCols {
    	            after = append(after, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
    	         }
    	         fmt.Printf("get row before=%v after=%v
    ", before, after)
    	      }
    	   }
    	   fmt.Printf("fetch message partition=%v key=%v
    ", msg.Partition, string(msg.Key))
    	   fmt.Printf("count partition-count=%v total-count=%v
    ", h.partitionCount, h.totalCount)
    	}
    	
    	func main() {
    	   fmt.Printf("config: %+v", c)
    	   config := sarama.NewConfig()
    	   config.Net.SASL.User = c.username
    	   config.Net.SASL.Password = c.password
    	   config.Net.SASL.Enable = true
    	   config.Consumer.Offsets.Initial = sarama.OffsetNewest
    	   config.Version = sarama.V2_2_2_0
    	   topic := c.topic
    	   group := c.group
    	   addr := strings.Split(c.brokers, ",")
    	   cons, err := sarama.NewConsumerGroup(addr, group, config)
    	   if err != nil {
    	      panic(err)
    	   }
    	   defer cons.Close()
    	   handler := &Handler{
    	      topic:          topic,
    	      partitionCount: make(map[int32]int),
    	   }
    	   for {
    	      err = cons.Consume(context.Background(), []string{handler.topic}, handler)
    	      if err != nil {
    	         log.Fatalln(err)
    	      }
    	   }
    	}
    	

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出:

    src_type:MySQL
    	entry_type:DDL
    	timestamp:1639057424
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	ddl_event:{sql:"create table demo (id_t int)"}
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    src_type:MySQL
    	entry_type:DML
    	timestamp:1639057434
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	dml_event:{
    			type:INSERT
    			table_id:"148"
    			column_defs:{
    					index:1
    					type:INTEGER
    					OriginType:"int"
    					name:"id_t"
    					is_nullable:true
    					is_unsigned:false
    			}
    			rows:{
    					after_cols:{
    							is_null:false
    							int64_value:1
    					}
    			}
    	}
最近更新时间:2025.07.29 20:46:09
这个页面对您有帮助吗?
有用
有用
无用
无用