You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Java中用org.apache.commons.pool2实现多服务器自定义验证连接池

用org.apache.commons.pool2实现按服务器区分的自定义验证连接池

刚好有过类似的实践经验,用BaseKeyedPooledObjectFactory来实现你要的多服务器连接池映射完全适配你的场景。我给你一步步拆解实现过程,结合你提到的3台服务器、每台对应3个连接的需求来写示例代码:

1. 先定义自定义连接对象

首先你需要封装对应服务器的归档数据查询连接,这里模拟一个ArchiveConnection类,包含连接服务器、关闭、验证有效性的核心方法(你可以根据实际场景替换成真实的连接实现,比如JDBC连接、Socket连接等):

public class ArchiveConnection {
    private final String serverId;
    // 替换为真实的连接实例,比如JDBC Connection、Socket等
    private Object rawConnection;

    public ArchiveConnection(String serverId) {
        this.serverId = serverId;
        // 模拟初始化连接到目标服务器的逻辑
        this.rawConnection = establishConnectionToServer(serverId);
    }

    // 模拟连接服务器的实际逻辑
    private Object establishConnectionToServer(String serverId) {
        System.out.println("建立新连接到服务器: " + serverId);
        return new Object(); // 替换为真实连接创建逻辑
    }

    // 验证连接是否有效(连接池核心校验方法)
    public boolean isValid() {
        // 替换为实际验证逻辑:比如ping服务器、执行测试查询
        return rawConnection != null;
    }

    // 关闭连接
    public void close() {
        System.out.println("关闭服务器 " + serverId + " 的连接");
        // 添加真实的连接销毁逻辑
        rawConnection = null;
    }

    // 模拟查询归档数据的业务方法
    public String queryArchiveData(String query) {
        return "服务器 " + serverId + " 返回数据: " + query;
    }

    public String getServerId() {
        return serverId;
    }
}

2. 实现Keyed连接池工厂

继承BaseKeyedPooledObjectFactory,泛型指定key为服务器标识(String类型),value为ArchiveConnection。需要实现几个核心方法来处理连接的创建、包装、校验和销毁:

import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class ArchiveConnectionFactory extends BaseKeyedPooledObjectFactory<String, ArchiveConnection> {

    // 根据服务器ID(key)创建对应连接
    @Override
    public ArchiveConnection create(String serverId) throws Exception {
        return new ArchiveConnection(serverId);
    }

    // 将连接包装为连接池需要的PooledObject实例
    @Override
    public PooledObject<ArchiveConnection> wrap(ArchiveConnection connection) {
        return new DefaultPooledObject<>(connection);
    }

    // 验证连接是否有效(连接池会在借出、归还时调用)
    @Override
    public boolean validateObject(String key, PooledObject<ArchiveConnection> pooledObject) {
        ArchiveConnection connection = pooledObject.getObject();
        return connection.isValid();
    }

    // 销毁无效或过期的连接
    @Override
    public void destroyObject(String key, PooledObject<ArchiveConnection> pooledObject) throws Exception {
        ArchiveConnection connection = pooledObject.getObject();
        connection.close();
    }
}

3. 配置并初始化多服务器连接池

使用GenericKeyedObjectPool来管理按key区分的连接池,配置每个服务器对应的最大连接数为3,其他参数可根据业务需求调整:

import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;

public class ArchiveConnectionPoolManager {
    private final GenericKeyedObjectPool<String, ArchiveConnection> connectionPool;

    public ArchiveConnectionPoolManager() {
        // 创建连接池配置
        GenericKeyedObjectPoolConfig<ArchiveConnection> poolConfig = new GenericKeyedObjectPoolConfig<>();
        // 每个服务器(key)对应的最大连接数,设置为3符合你的需求
        poolConfig.setMaxTotalPerKey(3);
        // 每个服务器对应的最小空闲连接数,可根据并发量调整
        poolConfig.setMinIdlePerKey(1);
        // 借出连接时自动验证有效性
        poolConfig.setTestOnBorrow(true);
        // 归还连接时自动验证有效性
        poolConfig.setTestOnReturn(true);
        // 空闲连接定期验证有效性
        poolConfig.setTestWhileIdle(true);

        // 初始化连接池,传入自定义工厂和配置
        ArchiveConnectionFactory factory = new ArchiveConnectionFactory();
        this.connectionPool = new GenericKeyedObjectPool<>(factory, poolConfig);
    }

    // 获取指定服务器的连接
    public ArchiveConnection getConnection(String serverId) throws Exception {
        return connectionPool.borrowObject(serverId);
    }

    // 归还连接到对应服务器的连接池
    public void returnConnection(String serverId, ArchiveConnection connection) {
        if (connection != null) {
            connectionPool.returnObject(serverId, connection);
        }
    }

    // 关闭连接池,释放所有服务器的连接资源
    public void closePool() {
        connectionPool.close();
    }
}

4. 实际业务使用示例

模拟业务代码中获取不同服务器的连接,查询归档数据并归还连接:

public class ArchiveQueryDemo {
    public static void main(String[] args) {
        ArchiveConnectionPoolManager poolManager = new ArchiveConnectionPoolManager();

        try {
            // 获取server1的连接并查询数据
            ArchiveConnection conn1 = poolManager.getConnection("server1");
            System.out.println(conn1.queryArchiveData("SELECT * FROM archive_2024_06"));
            poolManager.returnConnection("server1", conn1);

            // 获取server2的连接并查询数据
            ArchiveConnection conn2 = poolManager.getConnection("server2");
            System.out.println(conn2.queryArchiveData("SELECT * FROM archive_2024_05"));
            poolManager.returnConnection("server2", conn2);

            // 测试同一服务器的连接复用(会拿到空闲连接,不会重复创建)
            ArchiveConnection conn1Reuse = poolManager.getConnection("server1");
            System.out.println(conn1Reuse.queryArchiveData("SELECT COUNT(*) FROM archive_2024_06"));
            poolManager.returnConnection("server1", conn1Reuse);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 应用关闭时务必关闭连接池,释放所有资源
            poolManager.closePool();
        }
    }
}

关键注意事项

  • 有效性验证逻辑validateObject方法一定要结合实际业务场景实现,比如JDBC连接可以执行SELECT 1,Socket连接可以发送心跳包,确保连接确实可用。
  • 资源泄漏防范:必须在finally块中归还连接,避免连接池耗尽;应用 shutdown时必须调用closePool()释放所有连接资源。
  • 参数调优:可以根据实际并发量、服务器性能调整maxTotalPerKeyminIdlePerKey等参数,避免连接过多导致服务器压力过大,或者连接不足导致请求等待超时。

内容的提问来源于stack exchange,提问作者Stefano

火山引擎 最新活动