You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何通过Kafka REST Proxy向Kafka主题发布Avro消息?

如何通过Kafka REST Proxy向Kafka主题发布Avro消息?

我来给你一步步拆解怎么用Kafka REST Proxy发Avro格式的消息到Kafka主题,其实核心逻辑和实操都挺清晰的,先理清楚关键细节,再上具体的curl例子~

首先得明确:Kafka REST Proxy会自动和Schema Registry交互,不用你手动去Registry那边做额外繁琐操作。你有两种发送Avro消息的方式可选:

  • 第一种:直接在请求里带上完整的Avro Schema,Proxy会自动帮你注册新Schema(如果是第一次用),或者复用已有的兼容Schema
  • 第二种:如果你的Schema已经在Registry里注册过了,直接用它的value_schema_id(或者key_schema_id,如果key也是Avro格式)就行,效率更高

另外要注意消息格式:虽然最终是Avro数据,但你发送请求时得用JSON格式,并且要把实际的Avro数据嵌套在records数组里,每个record可以只带value,也可以同时带key和value。


实操例子(用curl工具)

假设你的Kafka REST Proxy地址是http://localhost:8082,要发消息的主题叫my_avro_topic,咱们分两种场景来看:

场景1:直接提供Avro Schema(适合新Schema或让Proxy自动处理注册)

如果是第一次用这个Schema发消息,或者不想手动去Registry注册,就用这种方式。比如我们要发包含用户姓名和年龄的Avro消息:

curl -X POST \
  http://localhost:8082/topics/my_avro_topic \
  -H 'Content-Type: application/vnd.kafka.avro.v2+json' \
  -d '{
    "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}",
    "records": [
      {"value": {"name": "Alice", "age": 30}},
      {"value": {"name": "Bob", "age": 25}}
    ]
  }'

这里要注意几个点:

  • 请求头的Content-Type必须指定为application/vnd.kafka.avro.v2+json,Proxy才会识别这是Avro格式的请求
  • value_schema里要放你的Avro Schema的JSON字符串(注意转义双引号)
  • records数组里的每个元素对应一条消息,value的结构要和你定义的Schema完全匹配

场景2:使用已注册的Schema ID(适合Schema已存在的情况)

如果这个Schema之前已经在Registry里注册过了,你可以直接用它的ID来发消息,省去Schema校验和注册步骤。比如假设这个User Schema的ID是123,命令如下:

curl -X POST \
  http://localhost:8082/topics/my_avro_topic \
  -H 'Content-Type: application/vnd.kafka.avro.v2+json' \
  -d '{
    "value_schema_id": 123,
    "records": [
      {"value": {"name": "Charlie", "age": 35}},
      {"value": {"name": "Diana", "age": 28}}
    ]
  }'

这里的value_schema_id就是你在Schema Registry里查到的对应ID,Proxy会直接用这个ID对应的Schema来序列化消息。


额外小提示

  • 如果你的消息key也是Avro格式,只需要把上面的value_schema换成key_schemavalue_schema_id换成key_schema_id,同时在records里加上key字段就行
  • 确保你的Schema Registry正常运行,并且Kafka REST Proxy的配置文件里已经正确配置了Registry的地址

内容来源于stack exchange

火山引擎 最新活动