数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。
用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。
运行语言 | 说明 |
---|---|
Go | 通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。 |
Python | 通过示例代码中参数 api_version 指定服务端 Kafka 版本号。 |
Java | 通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。 |
按需安装运行语言环境。
运行语言 | 说明 |
---|---|
Go | 安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。 |
Python |
|
Java |
|
本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
按需选择消费示例。
说明
Python 语言和 Java 语言消费示例的目录中包含 Demo 文件。
Python 语言和 Java 语言消费示例的目录如下所示:
. ├── dts_kafka_consumer_demo.py # 消费Kafka消息 ├── deserializer.py # 反序列化Avro格式数据 ├── record_printer.py # 打印反序列化后的Avro数据 └── record.avsc # schema 文件
. ├── DTSKafkaConsumerDemo.java # 消费Kafka消息 ├── AvroDeserializer.java # 反序列化Avro格式数据 ├── AvroRecordPrinter.java # 打印反序列化后的Avro数据 ├── record.avsc # schema 文件 └── avro # schema 生成的文件
Go 语言消费示例如下。
package main import ( "bytes" "context" "fmt" "strings" "sync" "github.com/twpayne/go-geom/encoding/wkb" "github.com/twpayne/go-geom/encoding/wkt" "github.com/Shopify/sarama" "your path of code generated by avro compilation/avro" ) func main() { config := sarama.NewConfig() brokers := "your brokers addrress" topic := "your topic" group := "your group" username := "your username" password := "your password" config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Net.SASL.Enable = true config.Net.SASL.User = username config.Net.SASL.Password = password cons, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) if err != nil { panic(err) } defer cons.Close() handler := &Handler{ topic: topic, partitionCount: make(map[int32]int), } for { err = cons.Consume(context.Background(), []string{handler.topic}, handler) if err != nil { fmt.Printf("consume failde, err=%v", err) } } } type Handler struct { format string topic string partitionCount map[int32]int totalCount int mu sync.Mutex } func (h *Handler) Setup(session sarama.ConsumerGroupSession) error { fmt.Println("setup") return nil } func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error { fmt.Println("clean up") return nil } func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { record, err := avro.DeserializeRecord(bytes.NewReader(msg.Value)) if err != nil { panic(err) } printRecord(&record) session.MarkMessage(msg, "") session.Commit() } return nil } func printRecord(record *avro.Record) { fmt.Printf("Operation[%s] ObjectName[%s] SourceTimestamp[%v] Id[%v]\n", record.Operation.String(), record.ObjectName.String, record.SourceTimestamp, record.Id) switch record.Operation { case avro.OperationDDL: fmt.Printf("DDL[%s] \n", record.AfterImages.String) case avro.OperationINSERT: for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] After[%s]\n", record.Fields.ArrayField[i].Name, after) } case avro.OperationUPDATE: for i := 0; i < len(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } after, err := getValue(record.AfterImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] Before[%s] After[%s]\n", record.Fields.ArrayField[i].Name, before, after) } case avro.OperationDELETE: for i := 0; i < len(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject); i++ { before, err := getValue(record.BeforeImages.ArrayUnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject[i]) if err != nil { panic(err) } fmt.Printf("Field[%s] Before[%s]\n", record.Fields.ArrayField[i].Name, before) } } } func getValue(in *avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject) (string, error) { if in == nil { return "null", nil } switch in.UnionType { case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumInteger: return in.Integer.Value, nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumCharacter: return string(in.Character.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDecimal: return fmt.Sprintf("%s", in.Decimal.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumFloat: return fmt.Sprintf("%v", in.Float.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestamp: return fmt.Sprintf("%v.%v", in.Timestamp.Timestamp, in.Timestamp.Millis), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumDateTime: var year, month, day, hour, minute, second int32 if in.DateTime.Year != nil { year = in.DateTime.Year.Int } if in.DateTime.Month != nil { month = in.DateTime.Month.Int } if in.DateTime.Day != nil { day = in.DateTime.Day.Int } if in.DateTime.Hour != nil { hour = in.DateTime.Hour.Int } if in.DateTime.Minute != nil { minute = in.DateTime.Minute.Int } if in.DateTime.Second != nil { second = in.DateTime.Second.Int } if year != 0 { return fmt.Sprintf("%v-%v-%v %v:%v:%v", year, month, day, hour, minute, second), nil } return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTimestampWithTimeZone: var year, month, day, hour, minute, second int32 if in.TimestampWithTimeZone.Value.Year != nil { year = in.TimestampWithTimeZone.Value.Year.Int } if in.TimestampWithTimeZone.Value.Month != nil { month = in.TimestampWithTimeZone.Value.Month.Int } if in.TimestampWithTimeZone.Value.Day != nil { day = in.TimestampWithTimeZone.Value.Day.Int } if in.TimestampWithTimeZone.Value.Hour != nil { hour = in.TimestampWithTimeZone.Value.Hour.Int } if in.TimestampWithTimeZone.Value.Minute != nil { minute = in.TimestampWithTimeZone.Value.Minute.Int } if in.TimestampWithTimeZone.Value.Second != nil { second = in.TimestampWithTimeZone.Value.Second.Int } if year != 0 { return fmt.Sprintf("%v-%v-%v %v:%v:%v+%v", year, month, day, hour, minute, second, in.TimestampWithTimeZone.Timezone), nil } return fmt.Sprintf("%v:%v:%v", hour, minute, second), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryGeometry: return getReadableGeometry(in.BinaryGeometry.Value) //return fmt.Sprintf("%s", string(in.BinaryGeometry.Value)), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextGeometry: return getReadableGeometry([]byte(in.TextGeometry.Value)) //return fmt.Sprintf("%s", in.TextGeometry.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumBinaryObject: return fmt.Sprintf("%s", string(in.BinaryObject.Value)), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumTextObject: return fmt.Sprintf("%s", in.TextObject.Value), nil case avro.UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObjectTypeEnumEmptyObject: return fmt.Sprintf("%s", in.EmptyObject.String()), nil } return "", fmt.Errorf("invalid value for *UnionNullIntegerCharacterDecimalFloatTimestampDateTimeTimestampWithTimeZoneBinaryGeometryTextGeometryBinaryObjectTextObjectEmptyObject") } func getReadableGeometry(raw []byte) (string, error) { if len(raw) < 4 { return "", fmt.Errorf("not a valid geometry %v", raw) } geomObj, err := wkb.Unmarshal(raw[4:]) if err != nil { return "", err } wktObj, err := wkt.Marshal(geomObj) if err != nil { return "", err } return wktObj, nil }
Go 语言生成 Schema 结构代码。
使用 Go 语言消费 Avro 格式的数据时,需运行以下命令下载代码生成工具,生成 Schema 结构代码。
go install github.com/actgardner/gogen-avro/v10/cmd/...@latest
。mkdir avro && gogen-avro ./avro record.avsc
生成代码。说明
修改 Demo 文件参数。
参数 | 说明 | 示例值 |
---|---|---|
GROUP | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
USER | Kafka 用户名。 | test_user |
PASSWORD | Kafka 用户密码。 | Test@*** |
TOPIC | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
BROKERS | 目标 DTS 数据订阅通道的私网地址。 | kafka-cndvhw9ves******.``kafka.ivolces.com:9092 |
说明
本文以数据库 test
,表格 demo
为例。
根据目标语言选择合适的 JSON 数据。
在源数据库中,执行以下命令创建一张名为 demo
的表。
CREATE TABLE demo (id_t INT);
Go、Java 和 Python 语言预期输出如下:
Operation[DDL] ObjectName[test] SourceTimestamp[1710848558] Id[1710782319849000034] DDL[CREATE TABLE demo (id_t INT)]
执行以下命令,在 demo
表中插入一条数据:
INSERT INTO demo (id_t) VALUES (1);
Go、Java 和 Python 语言预期输出如下:
Operation[INSERT] ObjectName[test.demo] SourceTimestamp[1710848859] Id[1710782319849000035] Field[id_t] After[1]