数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费Avro 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Avro 格式的数据。
按需安装运行语言环境。
运行环境 | 说明 |
---|---|
Go 语言 | 安装 Go,需使用 Go 1.13 或以上版本。 |
Java 语言 |
|
本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
按需选择消费示例。
说明
Java 语言消费示例的目录中包含 Demo 文件。
Java 语言消费示例的目录如下所示:
. ├── DTSRocketMQConsumerDemo.java # 消费RocketMQ消息 ├── AvroDeserializer.java # 反序列化Avro格式数据 ├── AvroRecordPrinter.java # 打印反序列化后的Avro数据 ├── record.avsc # schema 文件 └── avro # schema 生成的文件
Go 语言消费示例如下。
package main import ( "bytes" "context" "fmt" "strings" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/twpayne/go-geom/encoding/wkb" "github.com/twpayne/go-geom/encoding/wkt" "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/avro/avro" "your path of code generated by avro compilation/avro" ) func main() { nameserver := "your nameserver address" topic := "your topic" group := "your group" accessKey := "your access key" secretKey := "your secret key" cli, _ := rocketmq.NewPushConsumer( consumer.WithGroupName(group), consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(nameserver, ","))), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumerOrder(true), consumer.WithCredentials(primitive.Credentials{ AccessKey: accessKey, SecretKey: secretKey, }), ) err := cli.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, msg := range msgs { record, err := avro.DeserializeRecord(bytes.NewReader(msg.Body)) if err != nil { panic(err) } printRecord(&record) } return consumer.ConsumeSuccess, nil }) if err != nil { panic(err.Error()) } err = cli.Start() if err != nil { panic(err.Error()) } time.Sleep(time.Hour) cli.Shutdown() } func printRecord(record *avro.Record) { fmt.Printf("Operation[%s] ObjectName[%s] SourceTimestamp[%v] Id[%v]\n", record.Operation.String(), record.ObjectName.String, record.SourceTimestamp, record.Id) switch record.Operation { case avro.OperationDDL: fmt.Printf("DDL[%s] \n", record.AfterImages.String) case avro.OperationINSERT: for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] After[%s]\n", record.Fields.ArrayField[i].Name, after) } case avro.OperationUPDATE: for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] Before[%s] After[%s]\n", record.Fields.ArrayField[i].Name, before, after) } case avro.OperationDELETE: for i := 0; i < len(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] Before[%s]\n", record.Fields.ArrayField[i].Name, before) } } } func getValue(in *avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject) (string, error) { if in == nil { return "null", nil } switch in.UnionType { case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumInteger: return in.Integer.Value, nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumCharacter: return string(in.Character.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDecimal: return fmt.Sprintf("%s", in.Decimal.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumFloat: return fmt.Sprintf("%v", in.Float.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestamp: return fmt.Sprintf("%v.%v", in.Timestamp.Timestamp, in.Timestamp.Millis), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDateTime: var year, month, day, hour, minute, second int32 if in.DateTime.Year != nil { year = in.DateTime.Year.Int } if in.DateTime.Month != nil { month = in.DateTime.Month.Int } if in.DateTime.Day != nil { day = in.DateTime.Day.Int } if in.DateTime.Hour != nil { hour = in.DateTime.Hour.Int } if in.DateTime.Minute != nil { minute = in.DateTime.Minute.Int } if in.DateTime.Second != nil { second = in.DateTime.Second.Int } if year != 0 { return fmt.Sprintf("%v-%v-%v %v:%v:%v", year, month, day, hour, minute, second), nil } return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestampWithTimeZone: var year, month, day, hour, minute, second int32 if in.TimestampWithTimeZone.Value.Year != nil { year = in.TimestampWithTimeZone.Value.Year.Int } if in.TimestampWithTimeZone.Value.Month != nil { month = in.TimestampWithTimeZone.Value.Month.Int } if in.TimestampWithTimeZone.Value.Day != nil { day = in.TimestampWithTimeZone.Value.Day.Int } if in.TimestampWithTimeZone.Value.Hour != nil { hour = in.TimestampWithTimeZone.Value.Hour.Int } if in.TimestampWithTimeZone.Value.Minute != nil { minute = in.TimestampWithTimeZone.Value.Minute.Int } if in.TimestampWithTimeZone.Value.Second != nil { second = in.TimestampWithTimeZone.Value.Second.Int } if year != 0 { return fmt.Sprintf("%v-%v-%v %v:%v:%v+%v", year, month, day, hour, minute, second, in.TimestampWithTimeZone.Timezone), nil } return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryGeometry: return getReadableGeometry(in.BinaryGeometry.Value) //return fmt.Sprintf("%s", string(in.BinaryGeometry.Value)), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextGeometry: return getReadableGeometry([]byte(in.TextGeometry.Value)) //return fmt.Sprintf("%s", in.TextGeometry.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryObject: return fmt.Sprintf("%s", string(in.BinaryObject.Value)), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextObject: return fmt.Sprintf("%s", in.TextObject.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumEmptyObject: return fmt.Sprintf("%s", in.EmptyObject.String()), nil } return "", fmt.Errorf("invalid value for *UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject") } func getReadableGeometry(raw []byte) (string, error) { if len(raw) < 4 { return "", fmt.Errorf("not a valid geometry %v", raw) } geomObj, err := wkb.Unmarshal(raw[4:]) if err != nil { return "", err } wktObj, err := wkt.Marshal(geomObj) if err != nil { return "", err } return wktObj, nil }
Go 语言生成 Schema 结构代码。。
使用 Go 语言消费 Avro 格式的数据时,需运行以下命令下载代码生成工具,生成 Schema 结构代码。
下载代码生成工具 go install github.com/actgardner/gogen-avro/v10/cmd/...@latest
。
运行 mkdir avro && gogen-avro ./avro record.avsc
生成代码。
说明
修改 Demo 文件参数。
参数 | 说明 | 示例值 |
---|---|---|
group | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
accesssKey | RocketMQ 的用户名。 | 7ZrfJmOpVks5wvurbUgr**** |
secretKey | RocketMQ 的用户密码。 | q70s5SrCgsePZPh3fb0d**** |
topic | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
nameserver | 目标 DTS 数据订阅通道的私网地址。 | http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876 |
关于以上表格的详细信息,请参见查看 RocketMQ 实例详情。
说明
本文以数据库 test
,表格 demo
为例。
根据目标语言选择合适的 JSON 数据。
在源数据库中,执行以下命令创建一张名为 demo
的表。
CREATE TABLE demo (id_t INT);
Go 和 Java 语言预期输出如下:
Operation[DDL] ObjectName[test] SourceTimestamp[1710848558] Id[1710782319849000034] DDL[CREATE TABLE demo (id_t INT)]
执行以下命令,在 demo
表中插入一条数据:
INSERT INTO demo (id_t) VALUES (1);
Go 和 Java 语言预期输出如下:
Operation[INSERT] ObjectName[test.demo] SourceTimestamp[1710848859] Id[1710782319849000035] Field[id_t] After[1]