You need to enable JavaScript to run this app.
导航
通过 Kafka 消费 Avro 格式的订阅数据
最近更新时间:2025.06.30 17:13:55首次发布时间:2025.06.30 15:15:48
我的收藏
有用
有用
无用
无用

数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。

    运行语言说明
    Go通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。
    Python通过示例代码中参数 api_version 指定服务端 Kafka 版本号。
    Java通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。
  • 按需安装运行语言环境。

    运行语言说明
    Go安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。

    Python

    1. 安装 Python,需使用 Python 2.7 或以上版本。您可以执行 python --version 查看 Python 的版本。

    2. 依次执行以下命令,安装 pip 依赖。

      pip install kafka-python
      pip install avro
      

    Java

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。

    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。

    3. 在 IDEA 软件中,单击 Create New Project 创建一个 Project。

    4. 在新建 Project 中的项目对象模型文件 pom.xml 中添加以下依赖,本示例以 Kafka 2.2.2 版本为例。同时,您也可以将 pom.xml 文件中 kafka-clients 的版本修改为其他版本。

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.2</version>
      </dependency>
      <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.2</version>
      </dependency>
      

操作步骤

关联 Kafka 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 按需选择消费示例。

    说明

    Python 语言和 Java 语言消费示例的目录中包含 Demo 文件。

    Python 语言和 Java 语言消费示例的目录如下所示:

    • Python 语言消费示例目录

      .
              ├── dts_kafka_consumer_demo.py    # 消费Kafka消息
              ├── deserializer.py               # 反序列化Avro格式数据
              ├── record_printer.py             # 打印反序列化后的Avro数据
              └── record.avsc                   # schema 文件
      
    • Java 语言消费示例目录

      .
              ├── DTSKafkaConsumerDemo.java     # 消费Kafka消息
              ├── AvroDeserializer.java         # 反序列化Avro格式数据
              ├── AvroRecordPrinter.java        # 打印反序列化后的Avro数据
              ├── record.avsc                   # schema 文件
              └── avro                          # schema 生成的文件
      
    • Go 语言消费示例如下。

      package main
      
      import (
         "bytes"
         "context"
         "fmt"
         "strings"
         "sync"
      
         "github.com/twpayne/go-geom/encoding/wkb"
         "github.com/twpayne/go-geom/encoding/wkt"
         "github.com/Shopify/sarama"
         "your path of code generated by avro compilation/avro"
      )
      
      func main() {
         config := sarama.NewConfig()
         brokers := "your brokers addrress"
         topic := "your topic"
         group := "your group"
         username := "your username"
         password := "your password"
      
         config.Consumer.Offsets.Initial = sarama.OffsetNewest
         config.Net.SASL.Enable = true
         config.Net.SASL.User = username
         config.Net.SASL.Password = password
      
         cons, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
         if err != nil {
            panic(err)
         }
         defer cons.Close()
      
         handler := &Handler{
            topic:          topic,
            partitionCount: make(map[int32]int),
         }
         for {
            err = cons.Consume(context.Background(), []string{handler.topic}, handler)
            if err != nil {
               fmt.Printf("consume failde, err=%v", err)
            }
         }
      }
      
      type Handler struct {
         format         string
         topic          string
         partitionCount map[int32]int
         totalCount     int
         mu             sync.Mutex
      }
      
      func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
         fmt.Println("setup")
         return nil
      }
      
      func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
         fmt.Println("clean up")
         return nil
      }
      
      func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
         for msg := range claim.Messages() {
            record, err := avro.DeserializeRecord(bytes.NewReader(msg.Value))
            if err != nil {
               panic(err)
            }
            printRecord(&record)
            session.MarkMessage(msg, "")
            session.Commit()
         }
         return nil
      }
      
      func printRecord(record *avro.Record) {
         fmt.Printf("Operation[%s] ObjectName[%s] SourceTimestamp[%v] Id[%v]\n",
            record.Operation.String(), record.ObjectName.String, record.SourceTimestamp, record.Id)
         switch record.Operation {
         case avro.OperationDDL:
            fmt.Printf("DDL[%s] \n", record.AfterImages.String)
         case avro.OperationINSERT:
            for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] After[%s]\n", record.Fields.ArrayField[i].Name, after)
            }
         case avro.OperationUPDATE:
            for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] Before[%s] After[%s]\n", record.Fields.ArrayField[i].Name, before, after)
            }
         case avro.OperationDELETE:
            for i := 0; i < len(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] Before[%s]\n", record.Fields.ArrayField[i].Name, before)
            }
         }
      }
      
      func getValue(in *avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject) (string, error) {
         if in == nil {
            return "null", nil
         }
      
         switch in.UnionType {
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumInteger:
            return in.Integer.Value, nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumCharacter:
            return string(in.Character.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDecimal:
            return fmt.Sprintf("%s", in.Decimal.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumFloat:
            return fmt.Sprintf("%v", in.Float.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestamp:
            return fmt.Sprintf("%v.%v", in.Timestamp.Timestamp, in.Timestamp.Millis), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDateTime:
            var year, month, day, hour, minute, second int32
            if in.DateTime.Year != nil {
               year = in.DateTime.Year.Int
            }
            if in.DateTime.Month != nil {
               month = in.DateTime.Month.Int
            }
            if in.DateTime.Day != nil {
               day = in.DateTime.Day.Int
            }
            if in.DateTime.Hour != nil {
               hour = in.DateTime.Hour.Int
            }
            if in.DateTime.Minute != nil {
               minute = in.DateTime.Minute.Int
            }
            if in.DateTime.Second != nil {
               second = in.DateTime.Second.Int
            }
            if year != 0 {
               return fmt.Sprintf("%v-%v-%v %v:%v:%v", year, month, day, hour, minute, second), nil
            }
            return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestampWithTimeZone:
            var year, month, day, hour, minute, second int32
            if in.TimestampWithTimeZone.Value.Year != nil {
               year = in.TimestampWithTimeZone.Value.Year.Int
            }
            if in.TimestampWithTimeZone.Value.Month != nil {
               month = in.TimestampWithTimeZone.Value.Month.Int
            }
            if in.TimestampWithTimeZone.Value.Day != nil {
               day = in.TimestampWithTimeZone.Value.Day.Int
            }
            if in.TimestampWithTimeZone.Value.Hour != nil {
               hour = in.TimestampWithTimeZone.Value.Hour.Int
            }
            if in.TimestampWithTimeZone.Value.Minute != nil {
               minute = in.TimestampWithTimeZone.Value.Minute.Int
            }
            if in.TimestampWithTimeZone.Value.Second != nil {
               second = in.TimestampWithTimeZone.Value.Second.Int
            }
            if year != 0 {
               return fmt.Sprintf("%v-%v-%v %v:%v:%v+%v", year, month, day, hour, minute, second, in.TimestampWithTimeZone.Timezone), nil
            }
            return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil
      
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryGeometry:
            return getReadableGeometry(in.BinaryGeometry.Value)
            //return fmt.Sprintf("%s", string(in.BinaryGeometry.Value)), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextGeometry:
            return getReadableGeometry([]byte(in.TextGeometry.Value))
            //return fmt.Sprintf("%s", in.TextGeometry.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryObject:
            return fmt.Sprintf("%s", string(in.BinaryObject.Value)), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextObject:
            return fmt.Sprintf("%s", in.TextObject.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumEmptyObject:
            return fmt.Sprintf("%s", in.EmptyObject.String()), nil
         }
         return "", fmt.Errorf("invalid value for *UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject")
      }
      
      func getReadableGeometry(raw []byte) (string, error) {
         if len(raw) < 4 {
            return "", fmt.Errorf("not a valid geometry %v", raw)
         }
         geomObj, err := wkb.Unmarshal(raw[4:])
         if err != nil {
            return "", err
         }
      
         wktObj, err := wkt.Marshal(geomObj)
         if err != nil {
            return "", err
         }
         return wktObj, nil
      }
      
  4. Go 语言生成 Schema 结构代码。

    使用 Go 语言消费 Avro 格式的数据时,需运行以下命令下载代码生成工具,生成 Schema 结构代码。

    1. 下载代码生成工具 go install github.com/actgardner/gogen-avro/v10/cmd/...@latest
    2. 运行 mkdir avro && gogen-avro ./avro record.avsc生成代码。

    说明

    • 查看 Go 语言消息格式,请参见 Go 语言协议格式
    • 使用 Java 和 Python 语言消费 Avro 格式的数据时,无需生成 Schema 结构代码。

  1. 修改 Demo 文件参数。

    参数说明示例值
    GROUP消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    USERKafka 用户名。test_user
    PASSWORDKafka 用户密码。Test@***
    TOPIC目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    BROKERS目标 DTS 数据订阅通道的私网地址。kafka-cndvhw9ves******.``kafka.ivolces.com:9092

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。

  • 根据目标语言选择合适的 JSON 数据。

  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    Go、Java 和 Python 语言预期输出如下:

    Operation[DDL] ObjectName[test] SourceTimestamp[1710848558] Id[1710782319849000034]
    DDL[CREATE TABLE demo (id_t INT)]
    
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    Go、Java 和 Python 语言预期输出如下:

    Operation[INSERT] ObjectName[test.demo] SourceTimestamp[1710848859] Id[1710782319849000035]
    Field[id_t] After[1]