Kafka的事务机制提供了原子性,一致性和持久性,可以确保消息仅被一次提交,避免了消息的重复发送。然而,这个功能需要使用事务id(transactional.id)和幂等性(idempotence)机制来实现。在sarama中,开启这个功能只需要做几个简单的步骤。
首先需要开启idempotence,这可以确保相同的消息仅被发送一次:
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Idempotent = true
接下来,需要配置transactional.id,这个id需要全局唯一,并且要在Kafka服务器端提前设置好。此外,需要确保在使用同一个id下开启和关闭事务,不然会抛出异常。
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Idempotent = true
config.Producer.TransactionTimeout = 5 * time.Minute
config.Producer.TransactionalID = "example-transactional-id"
开启事务:
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
defer func() { _ = producer.Close() }()
txnID := "my-sarama-transaction"
err = producer.InitTransactions()
if err != nil {
panic(err)
}
err = producer.BeginTransaction()
if err != nil {
panic(err)
}
msg := &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("hello world"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
err = producer.CommitTransaction()
if err != nil {
panic(err)