数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。
已安装 protoc,建议使用 protoc 3.18 或以上版本。
说明
您可以执行 protoc -version
查看 protoc 版本。
用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。
运行语言 | 说明 |
---|---|
Go | 通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。 |
Python | 通过示例代码中参数 api_version 指定服务端 Kafka 版本号。 |
Java | 通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。 |
按需安装运行语言环境。
运行语言 | 说明 |
---|---|
Go | 安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。 |
Python |
|
Java |
|
本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
按需选择 Java 消费示例或 Python 消费示例,Python 语言和 Java 语言各消费示例的目录如下所示:
.
├── dts_kafka_consumer_demo.py # 消费 Demo 文件
├── volc.proto # 火山引擎格式文件
└── volc_pb2.py # 编译 Volc.proto 后的生成的 Python 文件
说明
Go 语言中仅包含一个 Demo 文件。
按需修改 Demo 文件,具体代码如下所示。
参数 | 说明 | 示例值 |
---|---|---|
GROUP | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
USER | Kafka 用户名。 | test_user |
PASSWORD | Kafka 用户密码。 | Test@Pwd |
TOPIC | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
BROKERS | 目标 DTS 数据订阅通道的私网地址。 | kafka-cndvhw9ves******.kafka.ivolces.com:9092 |
package main
import (
"context"
"fmt"
"log"
"strings"
"sync"
"github.com/Shopify/sarama"
proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto"
protobuf "google.golang.org/protobuf/proto"
)
type Handler struct {
topic string
partitionCount map[int32]int
totalCount int
mu sync.Mutex
}
type Config struct {
username string
password string
topic string
group string
brokers string
}
var (
c Config
)
func init() {
c.brokers = "your brokers addrress"
c.topic = "your topic"
c.group = "your group"
c.username = "your username"
c.password = "your password"
}
func (h *Handler) Setup(session sarama.ConsumerGroupSession) error {
fmt.Println("setup")
return nil
}
func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error {
fmt.Println("clean up")
return nil
}
func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
fmt.Println("ConsumeClaim")
for m := range claim.Messages() {
h.handleMsg(m)
session.MarkMessage(m, "")
session.Commit()
}
return nil
}
func (h *Handler) handleMsg(msg *sarama.ConsumerMessage) {
h.mu.Lock()
defer h.mu.Unlock()
h.totalCount++
h.partitionCount[msg.Partition]++
entry := &proto.Entry{}
if err := protobuf.Unmarshal(msg.Value, entry); err != nil {
panic(err)
}
fmt.Println("-------------- handle message --------------")
fmt.Printf("get message EventType:%v
", entry.EntryType.String())
switch entry.GetEntryType() {
case proto.EntryType_DDL:
event := entry.GetDdlEvent()
fmt.Printf("ddl %v
", event.Sql)
case proto.EntryType_DML:
event := entry.GetDmlEvent()
cols := event.ColumnDefs
for _, row := range event.Rows {
var before, after []string
for i, col := range row.BeforeCols {
before = append(before, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
}
for i, col := range row.AfterCols {
after = append(after, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue()))
}
fmt.Printf("get row before=%v after=%v
", before, after)
}
}
fmt.Printf("fetch message partition=%v key=%v
", msg.Partition, string(msg.Key))
fmt.Printf("count partition-count=%v total-count=%v
", h.partitionCount, h.totalCount)
}
func main() {
fmt.Printf("config: %+v", c)
config := sarama.NewConfig()
config.Net.SASL.User = c.username
config.Net.SASL.Password = c.password
config.Net.SASL.Enable = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V2_2_2_0
topic := c.topic
group := c.group
addr := strings.Split(c.brokers, ",")
cons, err := sarama.NewConsumerGroup(addr, group, config)
if err != nil {
panic(err)
}
defer cons.Close()
handler := &Handler{
topic: topic,
partitionCount: make(map[int32]int),
}
for {
err = cons.Consume(context.Background(), []string{handler.topic}, handler)
if err != nil {
log.Fatalln(err)
}
}
}
说明
test
,表格 demo
为例。在源数据库中,执行以下命令创建一张名为 demo
的表。
CREATE TABLE demo (id_t INT);
预期输出:
src_type:MySQL entry_type:DDL timestamp:1639057424 server_id:"105198****" database:"test" table:"demo" ddl_event:{sql:"create table demo (id_t int)"}
执行以下命令,在 demo
表中插入一条数据:
INSERT INTO demo (id_t) VALUES (1);
预期输出:
src_type:MySQL
entry_type:DML
timestamp:1639057434
server_id:"105198****"
database:"test"
table:"demo"
dml_event:{
type:INSERT
table_id:"148"
column_defs:{
index:1
type:INTEGER
OriginType:"int"
name:"id_t"
is_nullable:true
is_unsigned:false
}
rows:{
after_cols:{
is_null:false
int64_value:1
}
}
}