You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Camel 4+Java 21多线程执行split/aggregate问题及单线程修复咨询

问题描述

我的Apache Camel路由包含以下代码:

.split().method(SomeDataCacheSplitter.class, "split").streaming()
    .marshal(gsonDataFormat)
    .aggregate(constant(true), new GroupedBodyAggregationStrategy())
        .executorService(new SynchronousExecutorService())
        .completionSize(1000)
        .completionPredicate(someDataCache.getEndOfFilePredicate())
        .to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess")
    .end()
.end()

在Java 11+Camel 3.17.0环境执行时,日志显示所有操作均在单线程完成:

2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ... 
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...

但在Java 21+Camel 4.8.2环境执行时,出现10个线程并行执行的情况:

2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-9] org.apache.http.wire ...
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-7] org.apache.http.wire ...  
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-6] org.apache.http.wire ...

路由逻辑是批量上传数据至Elasticsearch并创建索引,后续有.process步骤将新索引关联至别名。若不采用同步执行,会出现index_not_found_exception(提示“no such index”),极端情况下批量上传失败会导致别名指向损坏索引。

原因分析
  1. Elasticsearch组件客户端行为变更:Camel 4.x版本的Elasticsearch组件默认切换为异步HTTP客户端,该客户端自带默认大小为10的线程池,负责处理批量请求的IO操作。即使聚合器配置了SynchronousExecutorService,也仅控制聚合阶段的线程,下游Elasticsearch请求仍会由客户端线程池异步执行,这是导致多线程日志的核心原因。而Camel 3.x版本的组件默认使用同步客户端,所有操作都在Camel路由线程中完成。
  2. 聚合器执行器作用范围调整:Camel 4.x对聚合器的executorService配置逻辑做了微调,仅管控聚合任务的调度,不再强制下游端点的执行线程必须与聚合线程一致。
解决方案

要确保Camel 4+Java 21环境下全程单线程执行,可通过以下两种方式修改:

方式一:强制Elasticsearch组件使用同步客户端

修改Elasticsearch端点的URL,添加client=synchronous参数,强制组件使用同步HTTP客户端,让所有请求在路由线程中执行:

.to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess&client=synchronous")

方式二:替换为Camel内置同步执行器并管控下游线程

  1. 使用Camel内置的同步执行器(推荐,避免手动实例化可能的兼容问题):
.executorService("synchronous")
  1. 在Elasticsearch端点前添加.threads(1),强制下游操作在单线程执行:
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
    .executorService("synchronous")
    .completionSize(1000)
    .completionPredicate(someDataCache.getEndOfFilePredicate())
    .threads(1)
    .to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess")
.end()

额外验证

修改后可通过日志线程名称确认是否统一为路由线程(如/O dispatcher 1),同时测试后续关联别名的.process步骤是否再出现index_not_found_exception


内容的提问来源于stack exchange,提问作者Splioo

火山引擎 最新活动