You need to enable JavaScript to run this app.
导航

RabbitMQ 队列高可用

最近更新时间2024.02.01 17:22:37

首次发布时间2023.12.14 17:14:26

如果对数据可用性和可靠性要求较高,您可以选择使用一致性 hash 插件或 Quorum 队列来保障单节点故障场景的服务高可用。本文档介绍各种高可用方案的配置方式。

背景信息

Classic 队列,即经典队列,是 RabbitMQ 中最常见的基本队列类型,Classic 队列提供了基本的消息传递和处理功能,简单而高效。Classic 队列使用先进先出(FIFO)的消息传递方式,确保消息按照发送的顺序进行处理。
但 Classic 队列在 RabbitMQ 中基于单节点的存储结构,不具备数据冗余的特性。如果节点发生故障,队列中的消息可能会丢失。
如果仍然需要使用 Classic 队列,您也可以通过rabbitmq_consistent_hash_exchange插件实现 Classic 队列高可用。rabbitmq_consistent_hash_exchange是 RabbitMQ 提供的一致性 hash 交换器,用于通过一致性 hash 算法将消息分发到已绑定 Exchange 的队列上。该插件可以将原有的一个队列拆分为多个队列,并将拆分出的队列分别绑定到不同的节点上,以应对单节点故障的情况下队列不可用的场景。
Quorum 队列提供了数据冗余和高可用性的特性、对消息的可靠性保障更高。由于 Quorum 队列是 RabbitMQ 新引入特性,可能存在稳定性和一致性问题,请谨慎使用 Quorum 队列

通过插件实现 Classic 队列高可用

您可以在控制台启用rabbitmq_consistent_hash_exchange插件,然后便会通过一致性 hash 算法将消息分发到已绑定 Exchange 的队列上。x-consistent-hash类型的 Exchange 会根据消息的指定方式计算哈希进行路由。Exchange上绑定有不同权重的队列,并根据 Queue 的权重将消息分发到不同的队列中。

配置步骤

  1. 在火山引擎 RabbitMQ 控制台中启用rabbitmq_consistent_hash_exchange插件。
    操作步骤请参考开启插件
    图片
  2. 在 RabbitMQ WebUI 中创建一致性哈希 Exchange。
    登录 RabbitMQ WebUI 的方式请参考连接 RabbitMQ 管理地址
    1. 在顶部菜单栏选择 Exchanges。,然后 Add a new exchange
      图片
    2. 填写 Exchange 相关的配置,并单击 Add Exchange
      • Type:exchange 类型,此处选择 x-consistent-hash
      • Name:exchange 名称。此处设置 Exchange 的名称。
      • Durability:是否持久化。
        图片
  3. 在 RabbitMQ WebUI 中创建 Classic 类型的 Queue。
    1. 在顶部菜单栏选择 Queues,单击 Add a new queue
      图片
    2. 填写 Classic 队列相关的配置,并单击 Add queue
      • Type:队列类型,此处选择 Classic
      • Name:队列名称。此处设置 Classic 队列的名称。
      • Node:队列绑定的节点。将队列分别绑定到不同节点上。
        图片
  4. 在 RabbitMQ WebUI 中绑定 Exchange 和 Queues。
    1. Exchanges 页签中,找到并单击步骤 2 中创建好的 Exchange。
      图片
    2. 填写 Classic 队列与 Exchange 相关的绑定配置。
      • To queue:目的队列,此处选择步骤 2 中创建的 Classic 队列。
      • Routing key:路由键。在一致性 hash Exchange 中,routing key 表示消息路由到该队列的权重,根据该权重比,消息会按比例路由到绑定的队列。
        图片
    3. 依次为 Exchange 绑定所有 Classic 队列。
      图片

生产代码示例

消息生产示例代码如下。

public class classicHashDemoProducer {
    public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, InterruptedException {
        // 下列变量按照实际情况进行配置。
        final String host = "xxxxxxx.rabbitmq.volces.com";
        final int port = 5671;
        final String username = "xxxx";
        final String password = "xxxxxx";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(username);
        factory.setPassword(password);

        // 配置连接相关的参数,请根据业务特点与网络状况配置
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        factory.useSslProtocol();

        // 下列是消息发布的AMQP-0-9-1协议对象配置。
        // 设置Vhost名称和Exchange名称,请确保已经在RabbitMQ集群Web控制台创建。
        final String vhost = "/";
        final String EXCHANGE_NAME = "HashExchange";
        factory.setVirtualHost(vhost);
        // 创建Connection和Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        for (int i = 0; i < 10000; i++) {
            String message = "Hello, RabbitMQ!";
            String messageId = UUID.randomUUID().toString();
            // 为每条消息生成一个随机的RoutingKey,用于随机散列
            String RoutingKey = UUID.randomUUID().toString();
            channel.basicPublish(EXCHANGE_NAME, RoutingKey,
                    MessageProperties.PERSISTENT_TEXT_PLAIN.builder()
                            .messageId(messageId)
                            .build(), message.getBytes());
            System.out.println(" Sent message: '" + message + "' with id: " + messageId);
        }
        channel.close();
        connection.close();
    }
}

高可用场景验证

通过一致性 hash 插件实现 Classic 队列高可用之后,您可以参考以下示例,验证单节点故障场景下的 Classic 队列可用性。

节点状态正常

在 RabbitMQ 实例各节点状态正常的场景下,您可以登录 RabbitMQ WebUI,在顶部菜单栏 Queues 下观察各队列生产消费情况。例如以下示例表示消息散列到不同节点的不同队列上,减小了单节点压力。

单节点故障

在 RabbitMQ 实例存在单节点故障时,您可以在 RabbitMQ WebUI 中查看节点状态及生产消费情况,验证消息生产消费是否正常。

  • 查看故障节点。
    在顶部菜单栏 Overview 页签中查看节点状态。例如下图表示存在单节点故障。

  • 查看队列生产消费情况。
    在顶部菜单栏 Queues 页签中,观察各队列生产消费情况。其中,故障节点队列不可用,正常节点可以继续生产消费,证实了 Claasic 队列的可用性。

通过 Quorum 队列实现高可用

RabbitMQ Quorum 队列是在 RabbitMQ 3.8 版本中引入的一种队列类型,它具有高可用性和数据冗余的特性。相比于传统的 Classic 队列,Quorum 队列提供了更好的可用性和数据保护。

注意

由于 Quorum 队列是 RabbitMQ 新引入特性,可能存在稳定性和一致性问题,请谨慎使用 Quorum 队列

Quorum 队列的特性如下:

  • 复制和冗余:Quorum 队列使用 Raft 一致性协议来在多个节点之间复制和同步队列中的消息。每个消息都会被复制到多个节点上,以确保数据的冗余存储和可用性。
  • 自动故障转移:当节点发生故障时,Quorum 队列会自动进行故障转移,将主节点的角色切换到备份节点上。这样可以确保队列的持续可用性,即使部分节点发生故障。
  • 数据一致性:Quorum 队列使用 Raft 协议来保证数据的一致性。Raft 协议确保了消息的原子性、一致性和持久性,即使在节点故障和网络分区的情况下也能够保持数据的一致性。
  • 高可用性:由于 Quorum 队列可以在多个节点上进行复制,并具有自动故障转移的能力,因此它提供了更高的可用性和容错能力。即使部分节点发生故障,队列仍然可以继续工作而不会丢失数据。

创建 Quorum 队列的操作步骤如下:

  1. 登录消息队列 RabbitMQ版实例的 Web UI。
    操作步骤,请参见连接 RabbitMQ 管理地址
  2. 在顶部菜单栏,单击 Queues,然后单击 Add a new queue
    图片
  3. 填写仲裁队列相关的配置,然后单击 Add queue
    • Type:队列类型,此处选择 Quorum
    • Name:队列名称,此处自定义设置仲裁队列的名称。
      图片