You need to enable JavaScript to run this app.
最新活动
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

KafkacommitstrategiesinMassTransit

在 MassTransit,Kafka 协议是通过一种称为消费者(Consumer)的进程进行管理的。Kafka 在消费消息的过程中需要确认它们是否已经被成功处理,在 MassTransit 中有三种不同的提交策略来实现这个目标,分别是“AckOnError”、“AckOnSuccess”和“None”。下面是对每种策略的详细解释:

  1. AckOnError:当 Consumer 处理某条消息时发生异常时,就会立即向 Kafka 发送一条错误确认消息(error acknowledgement message),告知已经失败,此时 Consumer 将停止处理剩余的消息并关闭。这种提交策略是默认的选项。
var busControl = Bus.Factory.CreateUsingKafka(cfg =>
{
    cfg.Host("localhost");

    cfg.TopicEndpoint<Order>("order", "default", e =>
    {
        e.AutoOffsetReset = AutoOffsetReset.Latest;
        e.Consumer(() => new OrderConsumer());

        // enables the ACK strategy when an error occurs
        e.ConfigureConsumer<OrderConsumer>(cc =>
            cc.Options<BatchOptions>(b =>
            {
                b.MessageLimit = 10;
                b.TimeLimit = TimeSpan.FromSeconds(3);
                b.AcknowledgmentInterval = TimeSpan.FromMilliseconds(500);
            }).UseErrorQueue()
            .UseMessageRetry(r => r.Immediate(3)));
    });
});
  1. AckOnSuccess:当 Consumer 成功处理某条消息时,就会向 Kafka 发送一条确认消息(acknowledgement message)。否则什么都不会发生,Consumer 将保持等待状态。
var busControl = Bus.Factory.CreateUsingKafka(cfg =>
{
    cfg.Host("localhost");

    cfg.TopicEndpoint<Order>("order", "default", e =>
    {
        e.AutoOffsetReset = AutoOffsetReset.Latest;
        e.Consumer(() => new OrderConsumer());

        // enables the ACK strategy when a message is successfully processed
        e.ConfigureConsumer<OrderConsumer>(cc =>
            cc.Options<BatchOptions>(b =>
            {
                b.MessageLimit = 10;
本文内容通过AI工具匹配关键字智能整合而成,仅供参考,火山引擎不对内容的真实、准确或完整作任何形式的承诺。如有任何问题或意见,您可以通过联系service@volcengine.com进行反馈,火山引擎收到您的反馈后将及时答复和处理。
展开更多
面向开发者的云福利中心,ECS 60元/年,域名1元起,助力开发者快速在云上构建可靠应用

社区干货

火山引擎IaaS产品月刊-2023年6月年中合辑

ENI Trunking......更多云产品动态,尽在IaaS产品月刊。*“邀测”产品暂未对全部用户开放,如需使用,请[提交工单](https://console.volcengine.com/workorder/create/)或联系客户经理申请,申请成功后方可使用对应产... Flink、Hadoop)、搜索和日志数据处理场景(如 ElasticSearch、Kafka)、大规模并行处理及数据仓库(如 Redshift)。[了解详情>>](https://www.volcengine.com/docs/6396/68531) 3. **【ECS实例规格族发布】ECS共享...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

KafkacommitstrategiesinMassTransit -优选内容

通过 Spark Streaming 消费日志
日志服务提供 Kafka 协议消费功能,您可以使用 Spark Streaming 的 spark-streaming-kafka 组件对接日志服务,通过 Spark Streaming 将日志服务中采集的日志数据消费到下游的大数据组件或者数据仓库。 场景概述Spark... > stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafkaParams)); // todo 消费到下游大数据组件 st...
支持OpenAPI退订的商品
TransitRouter_CrossBorderBandwidth 中转路由器跨域带宽 TransitRouter_InterRegionBandwidth 对象存储 TOS 消息队列 RabbitMQ版 Message_Queue_for_RabbitMQ 消息队列 Kafka版 Message_Queue_for_Kafka 消息队列 RocketMQ版 Message_Queue_for_RocketMQ 云搜索服务 ESCloud 边缘计算节点 veEN 直播SDK LiveSDK 视频点播 vod 全站加速 dcdn 云游戏 veGame 云手机 android_cloud 实时音视频 veRTC 内容分发网络 CDN veImageX ima...
支持的云服务
volcengine_iam_access_key 访问秘钥volcengine_iam_login_profile 登录配置volcengine_iam_policy 访问权限volcengine_iam_role 访问角色volcengine_iam_role_policy_attachment 角色权限绑定volcengine_iam_us... volcengine_customer_gateway 自定义网关volcengine_vpn_connection 连接volcengine_vpn_gateway 网关volcengine_vpn_gateway_route 路由 中转路由器 中转路由器(Transit Router,TR)可以连接云上私有网络、VPN...
火山引擎IaaS产品月刊-2023年6月年中合辑
提交工单或联系客户经理申请,申请成功后方可使用对应产品并查看对应使用文档。 新品发布 【ECS实例规格族发布】ECS第三代Intel实例g3i正式商用ECS正式发布新一代面向通用场景的Intel实例产品,基于火山全新自研D... Flink、Hadoop)、搜索和日志数据处理场景(如 ElasticSearch、Kafka)、大规模并行处理及数据仓库(如 Redshift)。了解详情>> 【ECS实例规格族发布】ECS共享型实例s2正式商用共享型实例采用非绑定CPU调度模式,每个vC...

KafkacommitstrategiesinMassTransit -相关内容

云产品监控指标

ing版 VCM_MySQL_Sharding 15 云数据库MySQL版 VCM_RDS_MySQL 15 云数据库PostgreSQL版 VCM_RDS_PostgreSQL 15 云数据库RDS SQL Server版 VCM_RDS_SQLServer 15 缓存数据库Redis版-社区版 VCM_Redis ... Kafka版 VCM_Kafka 15 消息队列RabbitMQ版 VCM_RabbitMQ 15 消息队列RocketMQ版 VCM_RocketMQ 15 日志服务 VCM_TLS 15 网络 应用型负载均衡 VCM_ALB 15 Anycast公网IP VCM_AnycastEIP 15 共享带...

使用Grafana模板变量

实例类型 支持单选和多选 JSON { // type固定为intance "type": "instance", // 必填参数 namespace为各个云产品实际的namespace变量,具体见附录表格,Region 为地域参数,可以为特定的地域值,如 cn-be... Interface Name(实例名称),Id(实例ID),Status(状态) IPv6公网带宽 VCM_Ipv6AddressBandwidth Name(IPv6地址),Id(实例ID) IPv6网关 VCM_Ipv6Gateway Name(实例),Id(实例ID),Status(状态) 消息队列Kafka版 ...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

产品体验

体验中心

云服务器特惠

云服务器
云服务器ECS新人特惠
立即抢购

白皮书

一图详解大模型
浓缩大模型架构,厘清生产和应用链路关系
立即获取

最新活动

爆款1核2G共享型服务器

首年60元,每月仅需5元,限量秒杀
立即抢购

火山引擎增长体验专区

丰富能力激励企业快速增长
查看详情

数据智能VeDI

易用的高性能大数据产品家族
了解详情

一键开启云上增长新空间

立即咨询