You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Vert.x中跨Verticle通过EventBus写入Socket时处理writeQueueFull的实现方案

多Verticle通过EventBus向Socket写数据的流量控制实现

这个场景确实容易踩坑——当多个Verticle通过EventBus给同一个Socket发数据时,直接照搬单来源的写满处理逻辑,很容易出现重复注册回调、线程安全问题,甚至数据丢失。下面我给你拆解核心思路和完整可运行的示例:

核心设计要点

  • 线程安全优先:Socket的IO操作不是线程安全的,必须在它绑定的EventLoop线程里执行,所以要把写操作通过runOnContext切换到对应线程。
  • 避免重复注册drainHandler:如果每次队列满都注册一次回调,会导致多个回调被触发,必须用一个标志位来控制注册状态。
  • 连接清理:Socket断开时要及时取消对应的EventBus消费者,防止内存泄漏。

完整示例代码

1. Socket服务端Verticle(处理连接和EventBus消息)

这个Verticle负责启动NetServer,处理Socket连接,并作为EventBus的消费者接收其他Verticle发来的数据:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;

public class SocketServerVerticle extends AbstractVerticle {

    // 全局EventBus地址,所有发送端都通过这个地址发数据(单Socket场景)
    private static final String SOCKET_WRITE_TOPIC = "socket.data.stream";

    @Override
    public void start() {
        NetServer server = vertx.createNetServer(
            new NetServerOptions().setPort(1234).setHost("localhost")
        );

        server.connectHandler(socket -> {
            // 用数组存标志位(Java lambda无法修改基本类型变量),标记是否已注册drainHandler
            boolean[] isDrainHandlerRegistered = {false};

            // 注册EventBus消费者,接收要写入Socket的数据
            var consumer = vertx.eventBus().consumer(SOCKET_WRITE_TOPIC, message -> {
                // 把写操作切换到Socket所在的EventLoop线程,保证线程安全
                socket.runOnContext(v -> {
                    Buffer data = Buffer.buffer((String) message.body());
                    socket.write(data);

                    // 处理队列满的情况
                    if (socket.writeQueueFull()) {
                        if (!isDrainHandlerRegistered[0]) {
                            isDrainHandlerRegistered[0] = true;
                            // 队列有空位时恢复读取并重置标志位
                            socket.drainHandler(done -> {
                                isDrainHandlerRegistered[0] = false;
                                socket.resume();
                            });
                        }
                        socket.pause();
                    }
                });
            });

            // Socket断开时,取消EventBus消费者,清理资源
            socket.closeHandler(v -> consumer.unregister());
        }).listen();
    }
}

2. 数据发送Verticle示例(多个独立Verticle)

下面是两个不同的Verticle,分别通过EventBus向Socket发送数据:

第一个发送Verticle

import io.vertx.core.AbstractVerticle;

public class DataSenderVerticleA extends AbstractVerticle {

    private static final String SOCKET_WRITE_TOPIC = "socket.data.stream";

    @Override
    public void start() {
        // 每隔1秒发送一次数据
        vertx.setPeriodic(1000, timerId -> {
            String data = "[Sender A] " + System.currentTimeMillis() + "\n";
            vertx.eventBus().send(SOCKET_WRITE_TOPIC, data);
        });
    }
}

第二个发送Verticle

import io.vertx.core.AbstractVerticle;

public class DataSenderVerticleB extends AbstractVerticle {

    private static final String SOCKET_WRITE_TOPIC = "socket.data.stream";

    @Override
    public void start() {
        // 每隔1.5秒发送一次数据
        vertx.setPeriodic(1500, timerId -> {
            String data = "[Sender B] " + System.currentTimeMillis() + "\n";
            vertx.eventBus().send(SOCKET_WRITE_TOPIC, data);
        });
    }
}

3. 启动类(部署所有Verticle)

import io.vertx.core.Vertx;

public class MainApp {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new SocketServerVerticle());
        vertx.deployVerticle(new DataSenderVerticleA());
        vertx.deployVerticle(new DataSenderVerticleB());
    }
}

多Socket连接场景的扩展说明

如果你的服务需要处理多个Socket连接,上面的单主题方式就不适用了,你可以:

  • 为每个Socket连接生成唯一的EventBus主题(比如用socket.write.${socket.writeHandlerID()}),然后把这个主题告知发送端。
  • 或者在EventBus消息中携带目标Socket的标识(比如连接ID),在Socket服务端Verticle中维护一个Socket映射表,根据标识找到对应的Socket进行写入。

内容的提问来源于stack exchange,提问作者angrycrab

火山引擎 最新活动