Vert.x中跨Verticle通过EventBus写入Socket时处理writeQueueFull的实现方案
多Verticle通过EventBus向Socket写数据的流量控制实现
这个场景确实容易踩坑——当多个Verticle通过EventBus给同一个Socket发数据时,直接照搬单来源的写满处理逻辑,很容易出现重复注册回调、线程安全问题,甚至数据丢失。下面我给你拆解核心思路和完整可运行的示例:
核心设计要点
- 线程安全优先:Socket的IO操作不是线程安全的,必须在它绑定的EventLoop线程里执行,所以要把写操作通过
runOnContext切换到对应线程。 - 避免重复注册drainHandler:如果每次队列满都注册一次回调,会导致多个回调被触发,必须用一个标志位来控制注册状态。
- 连接清理:Socket断开时要及时取消对应的EventBus消费者,防止内存泄漏。
完整示例代码
1. Socket服务端Verticle(处理连接和EventBus消息)
这个Verticle负责启动NetServer,处理Socket连接,并作为EventBus的消费者接收其他Verticle发来的数据:
import io.vertx.core.AbstractVerticle; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetSocket; public class SocketServerVerticle extends AbstractVerticle { // 全局EventBus地址,所有发送端都通过这个地址发数据(单Socket场景) private static final String SOCKET_WRITE_TOPIC = "socket.data.stream"; @Override public void start() { NetServer server = vertx.createNetServer( new NetServerOptions().setPort(1234).setHost("localhost") ); server.connectHandler(socket -> { // 用数组存标志位(Java lambda无法修改基本类型变量),标记是否已注册drainHandler boolean[] isDrainHandlerRegistered = {false}; // 注册EventBus消费者,接收要写入Socket的数据 var consumer = vertx.eventBus().consumer(SOCKET_WRITE_TOPIC, message -> { // 把写操作切换到Socket所在的EventLoop线程,保证线程安全 socket.runOnContext(v -> { Buffer data = Buffer.buffer((String) message.body()); socket.write(data); // 处理队列满的情况 if (socket.writeQueueFull()) { if (!isDrainHandlerRegistered[0]) { isDrainHandlerRegistered[0] = true; // 队列有空位时恢复读取并重置标志位 socket.drainHandler(done -> { isDrainHandlerRegistered[0] = false; socket.resume(); }); } socket.pause(); } }); }); // Socket断开时,取消EventBus消费者,清理资源 socket.closeHandler(v -> consumer.unregister()); }).listen(); } }
2. 数据发送Verticle示例(多个独立Verticle)
下面是两个不同的Verticle,分别通过EventBus向Socket发送数据:
第一个发送Verticle
import io.vertx.core.AbstractVerticle; public class DataSenderVerticleA extends AbstractVerticle { private static final String SOCKET_WRITE_TOPIC = "socket.data.stream"; @Override public void start() { // 每隔1秒发送一次数据 vertx.setPeriodic(1000, timerId -> { String data = "[Sender A] " + System.currentTimeMillis() + "\n"; vertx.eventBus().send(SOCKET_WRITE_TOPIC, data); }); } }
第二个发送Verticle
import io.vertx.core.AbstractVerticle; public class DataSenderVerticleB extends AbstractVerticle { private static final String SOCKET_WRITE_TOPIC = "socket.data.stream"; @Override public void start() { // 每隔1.5秒发送一次数据 vertx.setPeriodic(1500, timerId -> { String data = "[Sender B] " + System.currentTimeMillis() + "\n"; vertx.eventBus().send(SOCKET_WRITE_TOPIC, data); }); } }
3. 启动类(部署所有Verticle)
import io.vertx.core.Vertx; public class MainApp { public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new SocketServerVerticle()); vertx.deployVerticle(new DataSenderVerticleA()); vertx.deployVerticle(new DataSenderVerticleB()); } }
多Socket连接场景的扩展说明
如果你的服务需要处理多个Socket连接,上面的单主题方式就不适用了,你可以:
- 为每个Socket连接生成唯一的EventBus主题(比如用
socket.write.${socket.writeHandlerID()}),然后把这个主题告知发送端。 - 或者在EventBus消息中携带目标Socket的标识(比如连接ID),在Socket服务端Verticle中维护一个Socket映射表,根据标识找到对应的Socket进行写入。
内容的提问来源于stack exchange,提问作者angrycrab




