You need to enable JavaScript to run this app.
最新活动
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Kafka中的“Correlation ID in Header”

Kafka中添加“Correlation ID in Header”可以通过以下步骤实现:

  1. 首先,需要创建一个自定义的ProducerInterceptor和ConsumerInterceptor来处理消息的header。下面是一个示例:
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class CorrelationIdInterceptor<K, V> implements ConsumerInterceptor<K, V> {

    private static final String CORRELATION_ID_HEADER = "correlation-id";

    @Override
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    }

    @Override
    public void onConsume(ConsumerRecords<K, V> records, ConsumerInterceptor<K, V>.ConsumerRecordsInterceptorCallback interceptCallback) {
        for (ConsumerRecord<K, V> record : records) {
            Headers headers = record.headers();
            Header correlationIdHeader = headers.lastHeader(CORRELATION_ID_HEADER);
            if (correlationIdHeader != null) {
                String correlationId = new String(correlationIdHeader.value());
                // 在这里处理相关的逻辑
            }
        }
        interceptCallback.onConsume(records);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerInterceptor<K, V>.OffsetCommitCallback commitCallback) {
        commitCallback.onComplete(offsets, null);
    }

}
  1. 在Producer中,可以使用以下代码将correlation id添加到消息的header中:
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.util.concurrent.Future;

public class CorrelationIdProducer {

    private static final String CORRELATION_ID_HEADER = "correlation-id";

    private KafkaProducer<String, String> producer;

    public CorrelationIdProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer<>(props);
    }

    public Future<RecordMetadata> send(String topic, String key, String value, String correlationId) {
        Header header = new RecordHeader(CORRELATION_ID_HEADER, correlationId.getBytes());
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, key, value, Collections.singletonList(header));
        return producer.send(record);
    }

    public void close() {
        producer.close();
    }

}
  1. 在Consumer中,可以使用ConsumerInterceptor来解析消息的header并获取correlation id。以下是一个示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CorrelationIdConsumer {

    private static final String CORRELATION_ID_HEADER = "correlation-id";

    private KafkaConsumer<String, String> consumer;

    public CorrelationIdConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
    }

    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList
本文内容通过AI工具匹配关键字智能整合而成,仅供参考,火山引擎不对内容的真实、准确或完整作任何形式的承诺。如有任何问题或意见,您可以通过联系service@volcengine.com进行反馈,火山引擎收到您的反馈后将及时答复和处理。
展开更多
面向开发者的云福利中心,ECS 60元/年,域名1元起,助力开发者快速在云上构建可靠应用

社区干货

聊聊 Kafka:Topic 创建流程与源码分析 | 社区征文

## 一、Topic 介绍Topic(主题)类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka 中的主题总是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事... 首先我们找到 kafka-topics.sh 这个脚本,看下里面的内容:```exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"```最终执行的是 kafka.admin.TopicCommand 该类,源码中找到该类,用 IDEA ...

Kafka数据同步

本实验主要聚焦跑通Kafka MirrorMaker (MM1)数据迁移流程。实验中的Source Kafka版本为2.12,基于本地机器搭建。现实生产环境会更加复杂,如果您有迁移类的需求,欢迎咨询[技术支持服务](https://console.volcengine.com/ticket/createTicketV2/)。# 关于实验 [#](https://vsop-online.bytedance.net/doc/manage/detail/6627/detail/?DocumentID=173809#%E5%85%B3%E4%BA%8E%E5%AE%9E%E9%AA%8C)* 预计部署时间:40分钟* 级别:高级...

干货|字节跳动基于Flink SQL的流式数据质量监控

且对于很多流式任务的“间”数据,原本不需要落地,为了监控而落到hive,存在着大量的资源浪费。为更好地满足流式数据用户的数据质量监控需求,同时填补数据质量平台在流式数据源方面的空白,字节跳动数据质量平台团队于2020年下半年,以Kafka数据写入延迟监控为切入点,陆续调研、开发、上线了一系列基于Flink StreamSQL的流式数据质量监控。本文为系列文章的上篇,重点介绍字节跳动数据质量平台技术调研及选型的思考。## 产品调...

火山引擎DataLeap的Data Catalog系统公有云实践

间件领域的标准云服务,和公司内部对应组件也会有若干差异,Data Catalog为此也做了多版本的兼容。Data Catalog在元数据存储上使用到了Hbase/MySQL/ES/Redis,然后在元数据采集和同步场景使用了Kafka,同时用到了日志... 我们没有使用Nginx或Java原生的方案,而是借助于火山引擎内部安全服务中的ZTI团队的envoy组件来实现,同时使用sidecar模式和我们后端服务容器集成部署,既降低了服务端部署改造成本,也解耦了服务端业务逻辑和安全认证...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

Kafka中的“Correlation ID in Header”-优选内容

聊聊 Kafka:Topic 创建流程与源码分析 | 社区征文
## 一、Topic 介绍Topic(主题)类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka 中的主题总是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事... 首先我们找到 kafka-topics.sh 这个脚本,看下里面的内容:```exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"```最终执行的是 kafka.admin.TopicCommand 该类,源码中找到该类,用 IDEA ...
Kafka订阅埋点数据(私有化)
本文档介绍了在增长分析(DataFinder)产品私有化部署场景下,开发同学如何访问Kafka Topic中的流数据,以便进一步进行数据分析和应用,比如实时推荐等。 1. 准备工作 kafka消费只支持内网环境消费,在开始之前,需要提前... "ip_addr_id": 0, "ssid": "9b03c884-3e85-47fe-b8cb-5faadd0d2e7f", "cdid": "54822291-8636-471a-b138-7d29ae3fff05" }, "header": { "app_id": 10000023, "app_name": "cuckoo", "app_ver...
Kafka订阅埋点数据(私有化)
本文档介绍了在增长分析(DataFinder)产品私有化部署场景下,开发同学如何访问Kafka Topic中的流数据,以便进一步进行数据分析和应用,比如实时推荐等。 1. 准备工作 kafka消费只支持内网环境消费,在开始之前,需要提前... "ip_addr_id": 0, "ssid": "9b03c884-3e85-47fe-b8cb-5faadd0d2e7f", "cdid": "54822291-8636-471a-b138-7d29ae3fff05" }, "header": { "app_id": 10000023, "app_name": "cuckoo", "app_ver...
通过 Kafka 消费 Canal Proto 格式的订阅数据
python pip install kafka-pythonpython pip install protobufpython pip install python-snappy Java 安装 Java,需使用 Java 1.8 或以上版本。您可以执行 java -version 查看 Java 版本。 安装 maven,需使用 Ma... fmt.Println("-------------- handle message --------------") fmt.Printf("ServerID:%v\n", entry.GetHeader().GetServerId()) fmt.Printf("Timestamp:%v\n", entry.GetHeader().GetExecuteTime()) fmt...

Kafka中的“Correlation ID in Header”-相关内容

默认接入点收发消息

{ "group.id": "xxxx" // 修改为指定消费组的名称 }} 2 发送消息 实现方法创建消息发送程序 producer.go。 编译并运行 producer.go 发送消息。 查看运行结果。运行结果示例如下。 说明 消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。 undefined go run -mod=vendor {DemoPath}/kafka.go```` 示例代码通过默认接入点生产消息的示例代码如下,您也可以参考 Demo 中的示...

请求结构

请求URL您可以通过发送 HTTPS POST 请求来调用 Kafka API 服务。HTTPS 请求 URL 的格式如下: POST {URI-scheme}://{Endpoint}/?Action={Action}&Version={Version} HTTP/1.1其: URI-scheme:表示用于传输请求... 请求头HTTP 请求的请求头(Header)中需要指定 Content-Type 和请求鉴权信息等参数。Content-type 指定了请求消息体的结构化格式,消息队列 Kafka版的 Content-Type 固定为 application/json。详细的参数列表及说明请...

Kafka数据同步

本实验主要聚焦跑通Kafka MirrorMaker (MM1)数据迁移流程。实验中的Source Kafka版本为2.12,基于本地机器搭建。现实生产环境会更加复杂,如果您有迁移类的需求,欢迎咨询[技术支持服务](https://console.volcengine.com/ticket/createTicketV2/)。# 关于实验 [#](https://vsop-online.bytedance.net/doc/manage/detail/6627/detail/?DocumentID=173809#%E5%85%B3%E4%BA%8E%E5%AE%9E%E9%AA%8C)* 预计部署时间:40分钟* 级别:高级...

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

请求结构

请求URL您可以通过发送 HTTPS POST 请求来调用 Kafka API 服务。HTTPS 请求 URL 的格式如下: Bash POST {URI-scheme}://{Endpoint}/?Action={Action}&Version={Version} HTTP/1.1其: URI-scheme:表示用于传输... 请求头HTTP 请求的请求头(Header)中需要指定 Content-Type 和请求鉴权信息等参数。Content-type 指定了请求消息体的结构化格式,消息队列 Kafka版的 Content-Type 固定为 application/json。详细的参数列表及说明请...

如何使用Nginx代理访问VPC内的自建Kafka

那么我们就可以通过Nginx代理来做四层代理,转发请求。 关于实验 预计部署时间:30分钟级别:初级相关产品:同VPC内的ECS两台(1台做Nginx代理,1台做Kafka Server)受众: 通用 环境说明 如果还没有火山引擎账号,点击此链接注册账号 如果您还没有VPC,请先点击链接创建VPC 云服务器ECS:Centos 7 本地电脑准备python环境,默认生产和消费消息。 实验步骤 步骤1:部署配置Nginx代理1.下载安装nginx,确保编译过程添加"--with-stream"模块...

SASL_SSL 接入点 PLAIN 机制收发消息

本文以 Go 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_SSL 接入点 PLAIN 机制接入消息队列 Kafka版,并收发消息。 前提条件已完成准备工作。详细说明请参考准备工作。 1 添加配置文件创建消息队列 Kafka版配... // 修改配置为待发送的 topic 名称 "consumer": { "group.id": "xxxx" // 修改为指定消费组的名称 }, "sasl": { "enabled": true, // 使用 SASL 接入点时,必须设置为 true "mechanism": "PLAIN", // ...

公共参数

本文介绍每个接口都需要使用的请求参数和返回参数。 请求URL参数参数 类型 是否必选 含义 Action String 必选 要执行的操作,如调用 DescribeInstances 查询已创建的消息队列 Kafka版实例。 Version String 必选 要... Authorization String 必选 HTTP 标准身份认证头部字段。格式为 HMAC-SHA256 Credential={AccessKeyId}/{ShortDate}/{Region}/{Service}/request, SignedHeaders={SignedHeaders}, Signature={Signature}。 A...

SASL_SSL 接入点 SCRAM 机制收发消息

// 修改配置为待发送的 topic 名称 "consumer": { "group.id": "xxxx" // 修改为指定消费组的名称 }, "sasl": { "enabled": true, // 使用 SASL 接入点时,必须设置为true "mechanism": "SCRAM-SHA-25... 您也可以参考 Demo 中的 示例文件{DemoPath}/client/producer.go,实现相关业务逻辑。 go package clientimport ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka")func RunProduce(config *KafkaCo...

SASL_PLAINTEXT 接入点 SCRAM 机制收发消息

本文以 Go 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。 前提条件已完成准备工作。详细说明请参考准备工作。 1 添加配置文件创建消息队列 Kaf... // 修改配置为实例的 SASL 接入点 "security.protocol": "SASL_PLAINTEXT", // 固定为 SASL_PLAINTEXT "topic": "xxxx", // 修改配置为待发送的 topic 名称 "consumer": { "group.id": "xxxx" // 修改为指定...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

产品体验

体验中心

云服务器特惠

云服务器
云服务器ECS新人特惠
立即抢购

白皮书

一图详解大模型
浓缩大模型架构,厘清生产和应用链路关系
立即获取

最新活动

爆款1核2G共享型服务器

首年60元,每月仅需5元,限量秒杀
立即抢购

火山引擎增长体验专区

丰富能力激励企业快速增长
查看详情

数据智能VeDI

易用的高性能大数据产品家族
了解详情

一键开启云上增长新空间

立即咨询