You need to enable JavaScript to run this app.
导航

通过 Kafka 消费火山引擎 Proto 格式的订阅数据

最近更新时间2023.10.08 11:45:19

首次发布时间2021.12.23 21:51:18

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。
    2. 依次执行以下命令,安装 pip 依赖。
      pip install kafka-python
      
      pip install protobuf
      
      pip install python-snappy
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。
    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。
    3. maven pom.xml 文件中添加以下依赖,本示例以 Kafka 2.2.2 版本为例 。
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      

操作步骤

下载和编译 ProtoBuf

在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译:

说明

本文以火山引擎定义的 ProtoBuf 为例。

  1. 下载 ProtoBuf 文件。

  2. ProtoBuf 文件编译成对应语言的代码。编译方法如下所示:

    • Go 语言
      由于数据库传输服务的开发人员已经帮助您将 ProtoBuf 文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go
    • Python 语言
      当您的语言是 Python 时,请执行以下命令将 ProtoBuf 文件编译成 Python 语言的代码,获取 volc_pb2 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Python
      protoc -I=. --python_out=. volc.proto
      
    • Java 语言
      当您的语言是 Java 时,请执行以下命令将 ProtoBuf 文件编译成 Java 语言的代码,获取 volc.Volc 文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java
      protoc -I=. --java_out=. volc.proto
      

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 编辑 .zshrc 文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@Pwd
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.kafka.ivolces.com:9092
  4. 按需选择以下 demo 示例。

package main


import (
	"context"
	"fmt"
	"log"
	"os"
	"strings"
	"sync"


	"github.com/Shopify/sarama"
	proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto"
	protobuf "google.golang.org/protobuf/proto"
)


type Handler struct {
	topic          string
	partitionCount map[int32]int
	totalCount     int
	mu             sync.Mutex
}


type Config struct {
	username string
	password string
	topic    string
	group    string
	addr     string
}


var (
	c Config
)


func init() {
	c.addr = os.Getenv("BROKERS")
	c.topic = os.Getenv("TOPIC")
	c.group = os.Getenv("GROUP")
	c.username = os.Getenv("USER")
	c.password = os.Getenv("PASSWORD")
}


func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
	fmt.Println("setup")
	return nil
}


func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
	fmt.Println("clean up")
	return nil
}


func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	fmt.Println("ConsumeClaim")
	for m := range claim.Messages() {
		h.handleMsg(m)
		session.MarkMessage(m, "")
		session.Commit()
	}
	return nil
}


func (h *Handler) handleMsg(msg *sarama.ConsumerMessage) {
	h.mu.Lock()
	defer h.mu.Unlock()
	h.totalCount++
	h.partitionCount[msg.Partition]++
	entry := &proto.Entry{}
	if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
		panic(err)
	}
	fmt.Println("-------------- handle message --------------")
	fmt.Printf("get message EventType:%v\n", entry.EntryType.String())
	switch entry.GetEntryType() {
	case proto.EntryType_DDL:
		event := entry.GetDdlEvent()
		fmt.Printf("ddl %v\n", event.Sql)
	case proto.EntryType_DML:
		event := entry.GetDmlEvent()
		cols := event.ColumnDefs
		for _, row := range event.Rows {
			var before, after []string
			for i, col := range row.BeforeCols {
				before = append(before, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
			}
			for i, col := range row.AfterCols {
				after = append(after, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
			}
			fmt.Printf("get row before=%v after=%v\n", before, after)
		}
	}
	fmt.Printf("fetch message partition=%v key=%v\n", msg.Partition, string(msg.Key))
	fmt.Printf("count partition-count=%v total-count=%v\n", h.partitionCount, h.totalCount)
}


func main() {
	fmt.Printf("config: %+v", c)
	config := sarama.NewConfig()
	config.Net.SASL.User = c.username
	config.Net.SASL.Password = c.password
	config.Net.SASL.Enable = true
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V2_2_2_0
	topic := c.topic
	group := c.group
	addr := strings.Split(c.addr, ",")
	cons, err := sarama.NewConsumerGroup(addr, group, config)
	if err != nil {
		panic(err)
	}
	defer cons.Close()
	handler := &Handler{
		topic:          topic,
		partitionCount: make(map[int32]int),
	}
	for {
		err = cons.Consume(context.Background(), []string{handler.topic}, handler)
		if err != nil {
			log.Fatalln(err)
		}
	}
}

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。
  • 根据目标语言选择合适的 JSON 数据。
  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    预期输出:

    src_type:MySQL
    	entry_type:DDL
    	timestamp:1639057424
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	ddl_event:{sql:"create table demo (id_t int)"}
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    预期输出:

    src_type:MySQL
    	entry_type:DML
    	timestamp:1639057434
    	server_id:"105198****"
    	database:"test"
    	table:"demo"
    	dml_event:{
    			type:INSERT
    			table_id:"148"
    			column_defs:{
    					index:1
    					type:INTEGER
    					OriginType:"int"
    					name:"id_t"
    					is_nullable:true
    					is_unsigned:false
    			}
    			rows:{
    					after_cols:{
    							is_null:false
    							int64_value:1
    					}
    			}
    	}