You need to enable JavaScript to run this app.
导航
生产并消费消息(SSL 方式)
最近更新时间:2024.07.09 11:56:49首次发布时间:2022.10.17 19:17:58

火山引擎消息队列 RabbitMQ版兼容开源 RabbitMQ 协议,创建 RabbitMQ 实例后,您可以通过 SSL 认证连接实例并生产、消费消息。
关于 RabbitMQ 的使用教程,请参考 RabbitMQ 官网提供的不同语言的连接和使用向导。详细信息请查看RabbitMQ官网

前提条件

  • 已完成开发环境准备,操作步骤请参考准备环境
  • 已经购买 RabbitMQ 实例,且实例状态为运行中。操作步骤请参考创建实例
  • 已开启了实例的公网访问和 SSL 认证。开启公网访问时,建议绑定的 EIP 带宽上限大于预估的公网业务流量峰值。操作步骤请参考设置公网访问
  • 已获取 RabbitMQ 实例详情中的AMQP接入点。操作步骤请参考查看实例信息
  • 进行生产和消费前,需要先在 RabbitMQ 集群管理 Web UI 上创建一个非 Admin 角色的新用户,并为新用户绑定 Virtual Host。操作步骤,请参见绑定用户和 Vhost

操作步骤

  1. 安装 Java 依赖库。

    1. 如果项目使用 Maven 构建,请在 pom.xml 文件中增加如下依赖。
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.12.0</version>
    </dependency>
    
    1. 如果项目使用Gradle构建,请添加如下依赖:
    compile 'com.rabbitmq:amqp-client:5.12.0'
    
  2. 连接实例并生产消息。
    为了提高稳定性和可用性,建议参考实例代码,开启客户端自动重连Publish Confirm等机制。

    package org.example.amqp.producer;
    
    import com.rabbitmq.client.*;
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.UUID;
    
    public class SimpleProducerSSL {
        private static final String host  = "your-end-point";  //AMQP协议公网接入地址。
        private static final int port  = 5671;
        private static final String userName  = "your-user-name";
        private static final String password  = "your-password";
        private static final String vhost  = "/";
        private static final String exchangeName  = "your-exchange";
        private static final String queueName  = "your-queue";
        private static final String bindingKey  = "your-key";
        private static final int deliveryMode  = 2;
        private static final int batchSize  = 50;
        private static final int publishConfirmTimeout  = 10000; // publish confirm超时时间10秒
    
        public static void main(String[] args) throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            // 设置接入点,在RabbitMQ版控制台实例详情页面查看
            factory.setHost(host);
            // 设置端口,AMQP协议SSL加密端口5671
            factory.setPort(port);
            // 用户名,在WebUI控制台配置并管理
            factory.setUsername(userName);
            // 密码,在WebUI控制台配置并管理
            factory.setPassword(password);
            // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
            factory.setVirtualHost(vhost);
            // 启用SSL(可选)
            factory.useSslProtocol();
    
            // 开启自动重连,默认值为true
            factory.setAutomaticRecoveryEnabled(true);
            // 设置自动重连间隔
            factory.setNetworkRecoveryInterval(5000);
            // 设置连接建立的协商超时时间
            factory.setConnectionTimeout(20000);
    
            // 建立Connection
            Connection connection = factory.newConnection();
    
            // 创建Channel
            Channel channel = connection.createChannel();
    
            // 开启publish confirm(可选,推荐开启)
            channel.confirmSelect();
    
            // 创建Exchange(可选)
            channel.exchangeDeclare(exchangeName, "direct", true, false, false, null);
            // 创建Classic类型队列(可选)
            channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
            // 绑定Exchange和队列(可选)
            channel.queueBind(queueName, exchangeName, bindingKey);
    
            // 注册publish confirm的回调
            channel.addConfirmListener(new ConfirmListener() {
                public void handleNack(long deliveryTag, boolean multiple) {
                    // 处理nack回调
                    System.out.println("nack received: " + deliveryTag);
                }
    
                public void handleAck(long deliveryTag, boolean multiple) {
                    // 处理ack回调
                    System.out.println("ack received: " + deliveryTag);
                }
            });
    
            int msgsToSend = 10000;
    
            // 持续发送消息
            while (msgsToSend > 0) {
                // 设置消息属性
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).deliveryMode(deliveryMode).build();
                String msgBody = "hello rabbitmq";
    
                try {
                    channel.basicPublish(exchangeName, bindingKey, true, props, msgBody.getBytes(StandardCharsets.UTF_8));
    
                    msgsToSend--;
    
                    // 等待上批次发送消息的confirm
                    if (msgsToSend%batchSize  == 0) {
                        channel.waitForConfirms(publishConfirmTimeout);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            // 关闭Channel和Connection
            channel.close();
            connection.close();
        }
    }
    
  3. 连接实例并消费消息。

    package org.example.amqp.consumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.util.HashMap;
    import java.util.concurrent.TimeoutException;
    
    public class SimpleConsumerSSL {
        private static final String host  = "your-end-point"; // AMQP协议公网接入地址。
        private static final int port  = 5671;
        private static final String userName  = "your-user-name";
        private static final String password  = "your-password";
        private static final String vhost  = "/";
        private static final String queueName  = "your-queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException {
            ConnectionFactory factory = new ConnectionFactory();
            // 设置接入点,在RabbitMQ版控制台实例详情页面查看
            factory.setHost(host);
            // 设置端口,AMQP协议默认端口5672
            factory.setPort(port);
            // 用户名,在WebUI控制台配置并管理
            factory.setUsername(userName);
            // 密码,在WebUI控制台配置并管理
            factory.setPassword(password);
            // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
            factory.setVirtualHost(vhost);
            // 启用SSL
            factory.useSslProtocol();
    
            // 建立Connection
            Connection connection = factory.newConnection();
            // 创建Channel
            Channel channel = connection.createChannel();
    
            // 创建Classic类型队列(非必须,如果已在WebUI控制台创建可跳过)
            channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
    
            // 启动消费
            consume(connection);
    
            // 关闭Channel和Connection
            channel.close();
            connection.close();
        }
    
        private static void consume(Connection connection) throws IOException {
            final Channel channel = connection.createChannel();
    
            channel.basicConsume(queueName, true, "test_consumer", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    // 处理收到的消息
                    System.out.printf("Received Message: %s, consumerTag: %s, deliveryTag: %s, messageId: %s\n", new String(body, StandardCharsets.UTF_8),
                            consumerTag, envelope.getDeliveryTag(), properties.getMessageId());
                }
            });
        }
    }