You need to enable JavaScript to run this app.
导航

通过 Kafka 消费火山引擎 Proto 格式的订阅数据

最近更新时间2024.03.19 10:52:30

首次发布时间2021.12.23 21:51:18

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 已安装 protoc,建议使用 protoc 3.18 或以上版本。

    说明

    您可以执行 protoc -version 查看 protoc 版本。

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。

    2. 依次执行以下命令,安装 pip 依赖。

      pip install kafka-python
      
      pip install protobuf
      
      pip install python-snappy
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。

    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。

    3. 在 IDEA 软件,单击 Create New Project 创建一个 Project。

    4. 在新建的 Project 中的项目对象模型文件 pom.xml 中添加以下依赖,本示例以 Kafka 2.2.2 版本为例。同时,您也可以将 pom.xml 文件中 kafka-clients 的版本修改为其他版本 。

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      <dependency>
        <groupId>com.github.daniel-shuy</groupId>
        <artifactId>kafka-protobuf-serde</artifactId>
        <version>2.2.0</version>
      </dependency>
      <dependency>
        <groupId>org.xerial.snappy</groupId>
        <artifactId>snappy-java</artifactId>
        <version>1.1.8.4</version>
      </dependency>
      <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.22.2</version>
      </dependency>
      

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 按需选择 Java 消费示例Python 消费示例,Python 语言和 Java 语言各消费示例的目录如下所示:

    .
    	├── dts_kafka_consumer_demo.py    # 消费 Demo 文件
    	├── volc.proto                    # 火山引擎格式文件
    	└── volc_pb2.py                   # 编译 Volc.proto 后的生成的 Python 文件
    	  

    说明

    Go 语言中仅包含一个 Demo 文件。

  4. 按需修改 Demo 文件,具体代码如下所示。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@Pwd
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.kafka.ivolces.com:9092
    package main
    	
    	import (
    	   "context"
    	   "fmt"
    	   "log"
    	   "strings"
    	   "sync"
    	
    	   "github.com/Shopify/sarama"
    	   proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto"
    	   protobuf "google.golang.org/protobuf/proto"
    	)
    	
    	type Handler struct {
    	   topic          string
    	   partitionCount map[int32]int
    	   totalCount     int
    	   mu             sync.Mutex
    	}
    	
    	type Config struct {
    	   username string
    	   password string
    	   topic    string
    	   group    string
    	   brokers  string
    	}
    	
    	var (
    	   c Config
    	)
    	
    	func init() {
    	   c.brokers = "your brokers addrress"
    	   c.topic = "your topic"
    	   c.group = "your group"
    	   c.username = "your username"
    	   c.password = "your password"
    	}
    	
    	func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
    	   fmt.Println("setup")
    	   return nil
    	}
    	
    	func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
    	   fmt.Println("clean up")
    	   return nil
    	}
    	
    	func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    	   fmt.Println("ConsumeClaim")
    	   for m := range claim.Messages() {
    	      h.handleMsg(m)
    	      session.MarkMessage(m, "")
    	      session.Commit()
    	   }
    	   return nil
    	}
    	
    	func (h *Handler) handleMsg(msg *sarama.ConsumerMessage) {
    	   h.mu.Lock()
    	   defer h.mu.Unlock()
    	   h.totalCount++
    	   h.partitionCount[msg.Partition]++
    	   entry := &proto.Entry{}
    	   if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
    	      panic(err)
    	   }
    	   fmt.Println("-------------- handle message --------------")
    	   fmt.Printf("get message EventType:%v
    ", entry.EntryType.String())
    	   switch entry.GetEntryType() {
    	   case proto.EntryType_DDL:
    	      event := entry.GetDdlEvent()
    	      fmt.Printf("ddl %v
    ", event.Sql)
    	   case proto.EntryType_DML:
    	      event := entry.GetDmlEvent()
    	      cols := event.ColumnDefs
    	      for _, row := range event.Rows {
    	         var before, after []string
    	         for i, col := range row.BeforeCols {
    	            before = append(before, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
    	         }
    	         for i, col := range row.AfterCols {
    	            after = append(after, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
    	         }
    	         fmt.Printf("get row before=%v after=%v
    ", before, after)
    	      }
    	   }
    	   fmt.Printf("fetch message partition=%v key=%v
    ", msg.Partition, string(msg.Key))
    	   fmt.Printf("count partition-count=%v total-count=%v
    ", h.partitionCount, h.totalCount)
    	}
    	
    	func main() {
    	   fmt.Printf("config: %+v", c)
    	   config := sarama.NewConfig()
    	   config.Net.SASL.User = c.username
    	   config.Net.SASL.Password = c.password
    	   config.Net.SASL.Enable = true
    	   config.Consumer.Offsets.Initial = sarama.OffsetNewest
    	   config.Version = sarama.V2_2_2_0
    	   topic := c.topic
    	   group := c.group
    	   addr := strings.Split(c.brokers, ",")
    	   cons, err := sarama.NewConsumerGroup(addr, group, config)
    	   if err != nil {
    	      panic(err)
    	   }
    	   defer cons.Close()
    	   handler := &Handler{
    	      topic:          topic,
    	      partitionCount: make(map[int32]int),
    	   }
    	   for {
    	      err = cons.Consume(context.Background(), []string{handler.topic}, handler)
    	      if err != nil {
    	         log.Fatalln(err)
    	      }
    	   }
    	}
    	

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出:

    src_type:MySQL
    	entry_type:DDL
    	timestamp:1639057424
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	ddl_event:{sql:"create table demo (id_t int)"}
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    src_type:MySQL
    	entry_type:DML
    	timestamp:1639057434
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	dml_event:{
    			type:INSERT
    			table_id:"148"
    			column_defs:{
    					index:1
    					type:INTEGER
    					OriginType:"int"
    					name:"id_t"
    					is_nullable:true
    					is_unsigned:false
    			}
    			rows:{
    					after_cols:{
    							is_null:false
    							int64_value:1
    					}
    			}
    	}