You need to enable JavaScript to run this app.
最新活动
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Storm拓扑在OptionalDataException引起的StormServerHandler Netty错误后停止发送。

在Storm拓扑中,如果出现OptionalDataException引起的StormServerHandler Netty错误,可以通过以下方法停止发送:

  1. 在Storm拓扑的代码中捕获OptionalDataException异常,并在捕获到异常后停止发送数据。
public class MyBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            // 处理数据
            // ...
            
            // 发送数据
            collector.emit(input, new Values(data));
            collector.ack(input);
        } catch (OptionalDataException e) {
            // 异常处理逻辑
            collector.fail(input);
            // 停止发送数据
            collector.emit("control_stream", new Values("stop"));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("control_stream", new Fields("command"));
        declarer.declare(new Fields("data"));
    }
}

在上面的代码中,通过捕获OptionalDataException异常,我们停止了发送数据,并通过另外一个stream("control_stream")发送了一个控制命令("stop")。

  1. 在拓扑的其它组件中接收这个控制命令,并在接收到停止命令后停止发送数据。
public class MyBolt2 extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        if (input.getSourceStreamId().equals("control_stream")) {
            String command = input.getStringByField("command");
            if (command.equals("stop")) {
                // 停止发送数据
                collector.emit("control_stream", new Values("stop"));
            }
        } else {
            // 处理数据
            // ...
            
            // 发送数据
            collector.emit(input, new Values(data));
            collector.ack(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("control_stream", new Fields("command"));
        declarer.declare(new Fields("data"));
    }
}

在上面的代码中,我们在另外一个bolt中接收到了控制命令("stop"),然后停止了发送数据。

这样,当OptionalDataException发生时,拓扑中的组件将停止发送数据,并知会其它组件停止发送。

本文内容通过AI工具匹配关键字智能整合而成,仅供参考,火山引擎不对内容的真实、准确或完整作任何形式的承诺。如有任何问题或意见,您可以通过联系service@volcengine.com进行反馈,火山引擎收到您的反馈后将及时答复和处理。
展开更多
面向开发者的云福利中心,ECS 60元/年,域名1元起,助力开发者快速在云上构建可靠应用

社区干货

MAD,现代安卓开发技术:Android 领域开发方式的重大变革|社区征文

这意味着会得到 Google 巨佬在 Android 端的鼎力支持以实现超越 Java 的优秀编程体验* 通过 `KMM`(Kotlin Multiplatform Mobile)实现跨移动端的支持* `Server-side`,天然支持后端开发* 通过 `Kotlin/JS` ... catch (IOException e) { result = new Result(e); } Result finalResult = result; new Handler(Looper.getMainLooper()).post(() -> updateUI(finalResult)); });}...

DevOps基于k8s发布系统CI/CD的实现|社区征文

list = null; try { list = gitLabApi.getRepositoryApi().getBranches(param.getProjectIdOrPath(), param.getBranchName()); } catch (GitLabApiException e) { LogUtils.throwException(logger, e, Messa... .addEventHandler(LogEvent.class, logEvent -> logger.info(logEvent.getMessage())));} catch (Exception e) { logger.error("Failed to build image", e); return false;}```其中,targetFiles是要构建镜...

云原生安全:保护云端应用的新策略与挑战 | 社区征文

throws Exception { http .authorizeRequests() .antMatchers("/api/**").authenticated() .anyRequest().permitAll() .and() .oauth2ResourceServer() .jwt(); }}``` 4. 持续集成/持续交付(CI/CD)安全```groovypipeline { agent any stages { stage('Build') { steps { // 执行代码审查和...

Kitex 支持 Dubbo 协议:助力多语言云原生生态融合

Kitex + codec-dubbo Server 端流程与 Client 端基本类似,具体例子可参考项目主页。#### **类型拓展**Hessian2 schema-free 的特性导致 Dubbo 的实现“过于灵活”,可以使用任意类型。为了适配 Dubbo Hes... codec-dubbo 在 pkg/hessian2/exception 包中提供了 Java 常见的异常,目前支持 java.lang.Exception 常见异常无需 Kitex 命令行工具的支持,直接引用即可,以下是 Client 端提取异常和 Server 端返回异常的...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

Storm拓扑在OptionalDataException引起的StormServerHandler Netty错误后停止发送。-优选内容

MAD,现代安卓开发技术:Android 领域开发方式的重大变革|社区征文
这意味着会得到 Google 巨佬在 Android 端的鼎力支持以实现超越 Java 的优秀编程体验* 通过 `KMM`(Kotlin Multiplatform Mobile)实现跨移动端的支持* `Server-side`,天然支持后端开发* 通过 `Kotlin/JS` ... catch (IOException e) { result = new Result(e); } Result finalResult = result; new Handler(Looper.getMainLooper()).post(() -> updateUI(finalResult)); });}...
DevOps基于k8s发布系统CI/CD的实现|社区征文
list = null; try { list = gitLabApi.getRepositoryApi().getBranches(param.getProjectIdOrPath(), param.getBranchName()); } catch (GitLabApiException e) { LogUtils.throwException(logger, e, Messa... .addEventHandler(LogEvent.class, logEvent -> logger.info(logEvent.getMessage())));} catch (Exception e) { logger.error("Failed to build image", e); return false;}```其中,targetFiles是要构建镜...
云原生安全:保护云端应用的新策略与挑战 | 社区征文
throws Exception { http .authorizeRequests() .antMatchers("/api/**").authenticated() .anyRequest().permitAll() .and() .oauth2ResourceServer() .jwt(); }}``` 4. 持续集成/持续交付(CI/CD)安全```groovypipeline { agent any stages { stage('Build') { steps { // 执行代码审查和...
Kitex 支持 Dubbo 协议:助力多语言云原生生态融合
Kitex + codec-dubbo Server 端流程与 Client 端基本类似,具体例子可参考项目主页。#### **类型拓展**Hessian2 schema-free 的特性导致 Dubbo 的实现“过于灵活”,可以使用任意类型。为了适配 Dubbo Hes... codec-dubbo 在 pkg/hessian2/exception 包中提供了 Java 常见的异常,目前支持 java.lang.Exception 常见异常无需 Kitex 命令行工具的支持,直接引用即可,以下是 Client 端提取异常和 Server 端返回异常的...

Storm拓扑在OptionalDataException引起的StormServerHandler Netty错误后停止发送。-相关内容

聊聊 Kafka:Topic 创建流程与源码分析 | 社区征文

Server 端如果 `auto.create.topics.enable` 设置为 true 时,那么当 Producer 向一个不存在的 topic 发送数据时,该 topic 同样会被创建出来,此时,副本数默认是 1。## 三、Topic 的创建流程### 3.1 Topic 创建入... replication-factor 一定要在1和32767之间。 if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1)) throw new IllegalArgumentException(s"The replication factor must be...

Enhancer-轻量化的字节码增强组件包|得物技术

T beforeMethod(` `// 接收动态传递过来的参数` `@PluginName String pluginName,` `// optional=true,表示this注解可以接收:构造方法或静态方法(会将this赋值为null),而不报错` `@Advice.This(optional =... throw new IllegalArgumentException("anyClassNameStartWith and anyAnnotationNameOnMethod can't be both empty"); } this.anyClassNameStartWith = anyCla...

Actor模型 - 分布式应用框架Akka

之间通过显式的发送消息来达到交互目的*。Akka是另外一种解决并发问题的思路,通过线程进程之间传递消息,避免对共享资源的竞争,Akka提供了一种称之为Actor的并发模型,粒度比线程还要小(但并不等同于协程),这表明你... ActorSystem创建一个叫ActorRef(ServerActor)的对象。然后将消息发送给ActorRef(WorkerActor)3. ActorRef(ServerActor)将消息发送给Dispatcher4. Dispatcher(ServerActor)将消息投递到目标Actor(WorkerActor)的...

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

KubeAdmiral支持提供代理 API 供用户访问成员集群资源

这个提议旨在在 KubeAdmiral 中引入代理 API,使用户能够在不登录每个云提供商的网站或切换 kubeconfig 上下文的情况下访问成员集群之间的资源。## 目标1. 开发一个代理api server,实现统一的 API 端点,用于访... // Path is api/v1/nodes // +optional Path string `json:"path,omitempty" protobuf:"bytes,1,opt,name=path"`}```### 统一API 端点使用API服务器聚合(AA)功能,在 KubeAdmiral 中实现 API 端点,代理对成...

适用于线上内存监控框架KOOM源码分析 | 社区征文

getLoopHandler().postDelayed(mLoopRunnable, delayMillis) } mIsLoopStopped = false}```我们可以看到,在父类的startLoop方法中,同样是使用Handler来进行延迟消息的发送,执行的就是这个mLoopRunnabl... "onJvmThreshold Exception " + it.message, true) }}```在KOOM的dumpAndAnalysis方法中,我们看到创建了hprofFile文件,然后接下来一个核心类ForkJvmHeapDumper,这个类主要作用就是dump内存快照。### 2.3.1...

[数据库系统] 业界列式存储浅析

然后合并结果;inserts 只需要发送给WS,deletes必须记录到RS,后续 tuple mover 会做清理;**updates 会被转换为delete + insert**。为了保证高速的搬运tuple,C-Store使用了 LSM-tree 的一个变体;C-Store 支持sna... MapReduce任务完成后,Driver将获取的结果返回给用户。除了将数据直接存储在HDFS之外,还能存储到其他系统上,如HBASE。但是需要提供对应的storage handler。在hive中,存储效率主要决定于SerDes和文件格式。hive本...

通过 Kafka 消费火山引擎 Proto 格式的订阅数据

proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto" protobuf "google.golang.org/protobuf/proto" ) type Handler struct { topic string partitio... throws java.lang.Exception { // set your kafka brokers address String brokers = "your brokers address"; // set your kafka username and password String username = "yo...

基于 FFmpeg 实现一个数据流风格的视频处理工具 | 社区征文

即直播结束后,首先切换到 VOD 服务提供的在线播放地址。这里主要使用到了云函数和 CDN 搭配,基本流程是直播结束后云端监测到结束事件,并生成回放文件的 CDN 播放链接,通过云函数,发送通知到本地服务接口,将对应直播场次的会放链接更新为云直播地址,以此来完成直播结束后,近乎无缝的回放切换衔接。由于在我方平台举行的教育类直播时效性比较明显,也就在直播结束后的第 2-3 天,播放量会骤降,带宽的压力也就降低了很多,也是为了节约...

接入流程

long engineHandler = engine.createEngine();engine.setContext(getApplicationContext());参数配置引擎类型// 语音合成引擎engine.setOptionString(engineHandler, SpeechEngineDefines.PARAMS_KEY_ENGINE_NAME_... 合成策略离在线语音合成 SDK,除了可以单独使用的在线合成及离线合成外,还提供了在线合成发生网络错误后自动切换到离线合成的策略,开发者可以通过配置建连超时 PARAMS_KEY_TTS_CONN_TIMEOUT_INT 和接收超时 PARAMS_...

特惠活动

热门爆款云服务器

100%性能独享,更高内存性能更佳,学习测试、web前端、企业应用首选,每日花费低至0.55元
60.00/1212.00/年
立即购买

域名注册服务

cn/top/com等热门域名,首年低至1元,邮箱建站必选
1.00/首年起32.00/首年起
立即购买

DCDN国内流量包100G

同时抵扣CDN与DCDN两种流量消耗,加速分发更实惠
2.00/20.00/年
立即购买

产品体验

体验中心

云服务器特惠

云服务器
云服务器ECS新人特惠
立即抢购

白皮书

一图详解大模型
浓缩大模型架构,厘清生产和应用链路关系
立即获取

最新活动

爆款1核2G共享型服务器

首年60元,每月仅需5元,限量秒杀
立即抢购

火山引擎增长体验专区

丰富能力激励企业快速增长
查看详情

数据智能VeDI

易用的高性能大数据产品家族
了解详情

一键开启云上增长新空间

立即咨询