Camel 4+Java 21多线程执行split/aggregate问题及单线程修复咨询
问题描述
我的Apache Camel路由包含以下代码:
.split().method(SomeDataCacheSplitter.class, "split").streaming() .marshal(gsonDataFormat) .aggregate(constant(true), new GroupedBodyAggregationStrategy()) .executorService(new SynchronousExecutorService()) .completionSize(1000) .completionPredicate(someDataCache.getEndOfFilePredicate()) .to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess") .end() .end()
在Java 11+Camel 3.17.0环境执行时,日志显示所有操作均在单线程完成:
2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ... 2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ... 2025-01-13 13:23:16.788 DEBUG 31550 --- [/O dispatcher 1] org.apache.http.wire ...
但在Java 21+Camel 4.8.2环境执行时,出现10个线程并行执行的情况:
2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-9] org.apache.http.wire ... 2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-7] org.apache.http.wire ... 2025-01-13T13:46:38.956+01:00 DEBUG 46176 --- [ient-0-thread-6] org.apache.http.wire ...
路由逻辑是批量上传数据至Elasticsearch并创建索引,后续有.process步骤将新索引关联至别名。若不采用同步执行,会出现index_not_found_exception(提示“no such index”),极端情况下批量上传失败会导致别名指向损坏索引。
原因分析
- Elasticsearch组件客户端行为变更:Camel 4.x版本的Elasticsearch组件默认切换为异步HTTP客户端,该客户端自带默认大小为10的线程池,负责处理批量请求的IO操作。即使聚合器配置了
SynchronousExecutorService,也仅控制聚合阶段的线程,下游Elasticsearch请求仍会由客户端线程池异步执行,这是导致多线程日志的核心原因。而Camel 3.x版本的组件默认使用同步客户端,所有操作都在Camel路由线程中完成。 - 聚合器执行器作用范围调整:Camel 4.x对聚合器的
executorService配置逻辑做了微调,仅管控聚合任务的调度,不再强制下游端点的执行线程必须与聚合线程一致。
解决方案
要确保Camel 4+Java 21环境下全程单线程执行,可通过以下两种方式修改:
方式一:强制Elasticsearch组件使用同步客户端
修改Elasticsearch端点的URL,添加client=synchronous参数,强制组件使用同步HTTP客户端,让所有请求在路由线程中执行:
.to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess&client=synchronous")
方式二:替换为Camel内置同步执行器并管控下游线程
- 使用Camel内置的同步执行器(推荐,避免手动实例化可能的兼容问题):
.executorService("synchronous")
- 在Elasticsearch端点前添加
.threads(1),强制下游操作在单线程执行:
.aggregate(constant(true), new GroupedBodyAggregationStrategy()) .executorService("synchronous") .completionSize(1000) .completionPredicate(someDataCache.getEndOfFilePredicate()) .threads(1) .to("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess") .end()
额外验证
修改后可通过日志线程名称确认是否统一为路由线程(如/O dispatcher 1),同时测试后续关联别名的.process步骤是否再出现index_not_found_exception。
内容的提问来源于stack exchange,提问作者Splioo




