You need to enable JavaScript to run this app.
导航

默认接入点收发消息

最近更新时间2023.07.20 21:24:36

首次发布时间2022.01.28 15:16:00

本文以 Go 客户端为例,介绍如何在 VPC 环境下通过默认接入点(PLAINTEXT)接入消息队列 Kafka版,并收发消息。

前提条件

已完成准备工作。详细说明请参考准备工作

1 添加配置文件

创建消息队列 Kafka版配置文件 config.json。配置文件字段的详细说明,请参考配置文件
使用默认接入点时,配置文件示例如下。

说明

请根据注释提示填写相关参数,并删除注释。

{
  "bootstrap.servers": "xxxxx", // 修改配置为实例的默认接入点
  "topic": "xxxx", // 修改配置为待发送的 topic 名称
  "consumer": {
    "group.id": "xxxx" // 修改为指定消费组的名称
  }
}

2 发送消息

实现方法

  1. 创建消息发送程序 producer.go
  2. 编译并运行 producer.go 发送消息。
  3. 查看运行结果。
    运行结果示例如下。
    图片

说明

消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。

go run -mod=vendor {DemoPath}/kafka.go
```
`

示例代码

通过默认接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/producer.go,实现相关业务逻辑。

package client

import (
   "fmt"
   "github.com/confluentinc/confluent-kafka-go/kafka"
)

func RunProduce(config *KafkaConf) error {
   // 构造生产配置
   configMap := &kafka.ConfigMap{
      "bootstrap.servers": config.BootstrapServers,
      "security.protocol": config.Protocol,
      "acks":              config.Producer.Acks,
      "batch.size":        config.Producer.BatchSize,
   }

   if config.Debug {
      // 开启Debug能力
      configMap.SetKey("debug", "ALL")
   }

   if config.Sasl.Enabled {
      // 配置SASL认证
      configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism)
      configMap.SetKey("sasl.username", config.Sasl.UserName)
      configMap.SetKey("sasl.password", config.Sasl.Password)
   }


   // 创建一个Kafka生产者对象
   producer, err := kafka.NewProducer(configMap)
   if err != nil {
      return err
   }

   // 处理消息发送的结果
   go callBack(producer)()

   // 获取发送channel
   sendChannel := producer.ProduceChannel()

   // 循环发送10条消息
   for count := 0; count < 10; count++ {
      // 构造消息对象
      msg := &kafka.Message{
         // 消息写入位置
         TopicPartition: kafka.TopicPartition{
            // 消息需要写入的Topic名称
            Topic: &config.Topic,
            // 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择
            Partition: kafka.PartitionAny,
         },
         // 消息内容,可以为nil
         Value: []byte(fmt.Sprintf("Bytedance test msg %d", count)),
         // 消息Key值,可以为nil。若消息key不为空且为指定分区进行写入时,相同key的消息会落在同一分区内
         Key: []byte(fmt.Sprintf("Bytedance test key %d", count)),
         // 消息的属性值,作为额外的扩展属性,可以为nil
         Headers: []kafka.Header{
            {Key: "service", Value: []byte("kafka")},
            {Key: "version", Value: []byte("2.2.0")},
         },
      }

      // 发送消息
      sendChannel <- msg
   }

   // 因为发送是异步动作,关闭生产者之前需要将消息都推送到服务端
   producer.Flush(10000)
   // 关闭发送者
   producer.Close()
   return nil
}

// 消息发送结果的一部方法回调,注意,若配置ACKS为0时,不会有消息回调产生
func callBack(p *kafka.Producer) func() {
   return func() {
      for event := range p.Events() {
         switch eType := event.(type) {
         case *kafka.Message:
            // 写入结果处理,可修改此处结果实现
            if eType.TopicPartition.Error != nil {
               // 写入报错,异常处理
               fmt.Printf("Delivery failed: %v\n", eType.TopicPartition.Error)
            } else {
               // 写入成功,正常处理
               fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
                  *eType.TopicPartition.Topic, eType.TopicPartition.Partition, eType.TopicPartition.Offset)
            }
         }
      }
   }
}

3 消费消息

实现方法

  1. 创建 Consumer 订阅消息程序 consumer.go
  2. 编译并运行 consumer.go 消费消息。
  3. 查看运行结果。

说明

消息队列 Kafka版提供示例Demo供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。

go run -mod=vendor {DemoPath}/kafka.go
```
`

消息消费示例代码

通过默认接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.go,实现相关业务逻辑。

package client

import (
   "fmt"
   "github.com/confluentinc/confluent-kafka-go/kafka"
   "strconv"
)

func RunConsumer(config *KafkaConf) error {
   // 构造消费配置
   configMap := &kafka.ConfigMap{
      "bootstrap.servers":  config.BootstrapServers,
      "security.protocol":  config.Protocol,
      "group.id":           config.Consumer.GroupId,
      "auto.offset.reset":  config.Consumer.AutoOffsetRest,
      "enable.auto.commit": strconv.FormatBool(config.Consumer.AutoCommit),
   }

   if config.Debug {
      // 开启Debug能力
      if err := configMap.SetKey("debug", "ALL"); err != nil {
         return err
      }
   }

   if config.Sasl.Enabled {
      // 配置SASL认证
      configMap.SetKey("sasl.mechanism", config.Sasl.Mechanism)
      configMap.SetKey("sasl.username", config.Sasl.UserName)
      configMap.SetKey("sasl.password", config.Sasl.Password)
   }

   // 创建消费者对象
   consumer, err := kafka.NewConsumer(configMap)
   if err != nil {
      return err
   }

   // 订阅指定的Topic列表
   err = consumer.SubscribeTopics([]string{config.Topic}, nil)
   if err != nil {
      return err
   }

   // 循环读取10条消息
   for count := 0; count < 10; {
      // 从服务端拉取消息,Poll方法需要进行周期性调用
      event := consumer.Poll(1000)
      switch msg := event.(type) {
      case *kafka.Message:
         // 消费到数据后,对数据执行处理。此处处理时间不宜过长,过长会导致链接断开
         handleMessage(msg)
         count++
      }
   }

   if !config.Consumer.AutoCommit {
      // 未开启自动提交的场景下,手动提交消费进度,用户可根据自身需求在合适的实际执行消费进度提交
      tps, err := consumer.Commit()
      if err != nil {
         fmt.Printf("Commit offsets failed. %s\n", err.Error())
      } else {
         fmt.Printf("Offset commited: \n")
         for _, tp := range tps {
            fmt.Printf("\t%s-%d: %d", *tp.Topic, tp.Partition, tp.Offset)
         }
      }
   }

   // 使用完毕后关闭消费者
   return consumer.Close()
}

// 对获取到的消息进行处理
func handleMessage(msg *kafka.Message) {
   fmt.Printf("Consumed a message from topic %s [%d] at offset %d. key: (%s), value: (%s), Headers: ",
      *msg.TopicPartition.Topic, msg.TopicPartition.Partition, msg.TopicPartition.Offset, msg.Key, msg.Value)
   if msg.Headers != nil {
      fmt.Printf("(")
      for _, header := range msg.Headers {
         fmt.Printf("%s->%s,", header.Key, header.Value)
      }
      fmt.Printf(")")
   }
   fmt.Printf("\n")
}