You need to enable JavaScript to run this app.
导航
通过 RocketMQ 消费 Avro 格式的订阅数据
最近更新时间:2025.06.30 17:13:55首次发布时间:2025.06.30 15:42:26
我的收藏
有用
有用
无用
无用

数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费Avro 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Avro 格式的数据。

前提条件

  • 已注册火山引擎账号并完成实名认证。账号的创建方法和实名认证,请参见如何进行账号注册实名认证

  • 按需安装运行语言环境。

    运行环境说明

    Go 语言

    安装 Go,需使用 Go 1.13 或以上版本。
    您可以执行 go version 查看 Go 的版本。

    Java 语言

    1. 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。

    2. 安装 maven,需使用 Maven 3.8 或以上版本。 您可以执行 mvn -version 查看 Maven 版本。

    3. 在 IDEA 软件,单击 Create New Project 创建一个 Project。

    4. 在新建的 Project 中的项目对象模型文件 pom.xml 中添加以下依赖。

      <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.2</version>
      </dependency>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
      </dependency>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-acl</artifactId>
        <version>4.9.4</version>
      </dependency>
      

操作步骤

关联 RocketMQ 和订阅任务

本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。

  1. 登录 DTS 控制台,创建并配置数据订阅通道。详细信息,请参见订阅方案概览

  2. 在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组

  3. 按需选择消费示例。

    说明

    Java 语言消费示例的目录中包含 Demo 文件。

    Java 语言消费示例的目录如下所示:

    • Java 语言消费示例目录

      .
              ├── DTSRocketMQConsumerDemo.java  # 消费RocketMQ消息
              ├── AvroDeserializer.java         # 反序列化Avro格式数据
              ├── AvroRecordPrinter.java        # 打印反序列化后的Avro数据
              ├── record.avsc                   # schema 文件
              └── avro                          # schema 生成的文件
      
    • Go 语言消费示例如下。

      package main
      
      import (
         "bytes"
         "context"
         "fmt"
         "strings"
         "time"
      
         "github.com/apache/rocketmq-client-go/v2"
         "github.com/apache/rocketmq-client-go/v2/consumer"
         "github.com/apache/rocketmq-client-go/v2/primitive"
         "github.com/twpayne/go-geom/encoding/wkb"
         "github.com/twpayne/go-geom/encoding/wkt"
         "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/avro/avro"
         "your path of code generated by avro compilation/avro"
      )
      
      func main() {
         nameserver := "your nameserver address"
         topic := "your topic"
         group := "your group"
         accessKey := "your access key"
         secretKey := "your secret key"
      
         cli, _ := rocketmq.NewPushConsumer(
            consumer.WithGroupName(group),
            consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(nameserver, ","))),
            consumer.WithConsumerModel(consumer.Clustering),
            consumer.WithConsumerOrder(true),
            consumer.WithCredentials(primitive.Credentials{
               AccessKey: accessKey,
               SecretKey: secretKey,
            }),
         )
      
         err := cli.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
            for _, msg := range msgs {
               record, err := avro.DeserializeRecord(bytes.NewReader(msg.Body))
               if err != nil {
                  panic(err)
               }
               printRecord(&record)
            }
            return consumer.ConsumeSuccess, nil
      
         })
         if err != nil {
            panic(err.Error())
         }
      
         err = cli.Start()
         if err != nil {
            panic(err.Error())
         }
      
         time.Sleep(time.Hour)
         cli.Shutdown()
      }
      
      func printRecord(record *avro.Record) {
         fmt.Printf("Operation[%s] ObjectName[%s] SourceTimestamp[%v] Id[%v]\n",
            record.Operation.String(), record.ObjectName.String, record.SourceTimestamp, record.Id)
         switch record.Operation {
         case avro.OperationDDL:
            fmt.Printf("DDL[%s] \n", record.AfterImages.String)
         case avro.OperationINSERT:
            for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] After[%s]\n", record.Fields.ArrayField[i].Name, after)
            }
         case avro.OperationUPDATE:
            for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] Before[%s] After[%s]\n", record.Fields.ArrayField[i].Name, before, after)
            }
         case avro.OperationDELETE:
            for i := 0; i < len(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ {
               before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i])
               if err != nil {
                  panic(err)
               }
               fmt.Printf("Field[%s] Before[%s]\n", record.Fields.ArrayField[i].Name, before)
            }
         }
      }
      
      func getValue(in *avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject) (string, error) {
         if in == nil {
            return "null", nil
         }
      
         switch in.UnionType {
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumInteger:
            return in.Integer.Value, nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumCharacter:
            return string(in.Character.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDecimal:
            return fmt.Sprintf("%s", in.Decimal.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumFloat:
            return fmt.Sprintf("%v", in.Float.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestamp:
            return fmt.Sprintf("%v.%v", in.Timestamp.Timestamp, in.Timestamp.Millis), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDateTime:
            var year, month, day, hour, minute, second int32
            if in.DateTime.Year != nil {
               year = in.DateTime.Year.Int
            }
            if in.DateTime.Month != nil {
               month = in.DateTime.Month.Int
            }
            if in.DateTime.Day != nil {
               day = in.DateTime.Day.Int
            }
            if in.DateTime.Hour != nil {
               hour = in.DateTime.Hour.Int
            }
            if in.DateTime.Minute != nil {
               minute = in.DateTime.Minute.Int
            }
            if in.DateTime.Second != nil {
               second = in.DateTime.Second.Int
            }
            if year != 0 {
               return fmt.Sprintf("%v-%v-%v %v:%v:%v", year, month, day, hour, minute, second), nil
            }
            return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestampWithTimeZone:
            var year, month, day, hour, minute, second int32
            if in.TimestampWithTimeZone.Value.Year != nil {
               year = in.TimestampWithTimeZone.Value.Year.Int
            }
            if in.TimestampWithTimeZone.Value.Month != nil {
               month = in.TimestampWithTimeZone.Value.Month.Int
            }
            if in.TimestampWithTimeZone.Value.Day != nil {
               day = in.TimestampWithTimeZone.Value.Day.Int
            }
            if in.TimestampWithTimeZone.Value.Hour != nil {
               hour = in.TimestampWithTimeZone.Value.Hour.Int
            }
            if in.TimestampWithTimeZone.Value.Minute != nil {
               minute = in.TimestampWithTimeZone.Value.Minute.Int
            }
            if in.TimestampWithTimeZone.Value.Second != nil {
               second = in.TimestampWithTimeZone.Value.Second.Int
            }
            if year != 0 {
               return fmt.Sprintf("%v-%v-%v %v:%v:%v+%v", year, month, day, hour, minute, second, in.TimestampWithTimeZone.Timezone), nil
            }
            return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil
      
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryGeometry:
            return getReadableGeometry(in.BinaryGeometry.Value)
            //return fmt.Sprintf("%s", string(in.BinaryGeometry.Value)), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextGeometry:
            return getReadableGeometry([]byte(in.TextGeometry.Value))
            //return fmt.Sprintf("%s", in.TextGeometry.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryObject:
            return fmt.Sprintf("%s", string(in.BinaryObject.Value)), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextObject:
            return fmt.Sprintf("%s", in.TextObject.Value), nil
         case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumEmptyObject:
            return fmt.Sprintf("%s", in.EmptyObject.String()), nil
         }
         return "", fmt.Errorf("invalid value for *UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject")
      }
      
      func getReadableGeometry(raw []byte) (string, error) {
         if len(raw) < 4 {
            return "", fmt.Errorf("not a valid geometry %v", raw)
         }
         geomObj, err := wkb.Unmarshal(raw[4:])
         if err != nil {
            return "", err
         }
      
         wktObj, err := wkt.Marshal(geomObj)
         if err != nil {
            return "", err
         }
         return wktObj, nil
      }
      
  4. Go 语言生成 Schema 结构代码。。
    使用 Go 语言消费 Avro 格式的数据时,需运行以下命令下载代码生成工具,生成 Schema 结构代码。

    1. 下载代码生成工具 go install github.com/actgardner/gogen-avro/v10/cmd/...@latest

    2. 运行 mkdir avro && gogen-avro ./avro record.avsc生成代码。

    说明

    • 查看 Go 语言消息格式,请参见Go 语言协议格式
    • 使用 Java 语言消费 Avro 格式的数据时,无需生成 Schema 结构代码。
  5. 修改 Demo 文件参数。

    参数说明示例值
    group消费组名称。285fef6b91754d0bbaab32e4976c****:test_dtssdk
    accesssKeyRocketMQ 的用户名。7ZrfJmOpVks5wvurbUgr****
    secretKeyRocketMQ 的用户密码。q70s5SrCgsePZPh3fb0d****
    topic目标 DTS 数据订阅通道的 Topic。d73e98e7fa9340faa3a0d4ccfa10****
    nameserver目标 DTS 数据订阅通道的私网地址。http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876

关于以上表格的详细信息,请参见查看 RocketMQ 实例详情

运行测试

说明

  • 本文以数据库 test,表格 demo 为例。

  • 根据目标语言选择合适的 JSON 数据。

  1. 在源数据库中,执行以下命令创建一张名为 demo 的表。

    CREATE TABLE demo (id_t INT);
    

    Go 和 Java 语言预期输出如下:

    Operation[DDL] ObjectName[test] SourceTimestamp[1710848558] Id[1710782319849000034]
    DDL[CREATE TABLE demo (id_t INT)]
    
  2. 执行以下命令,在 demo 表中插入一条数据:

    INSERT INTO demo (id_t) VALUES (1);
    

    Go 和 Java 语言预期输出如下:

    Operation[INSERT] ObjectName[test.demo] SourceTimestamp[1710848859] Id[1710782319849000035]
    Field[id_t] After[1]