You need to enable JavaScript to run this app.
导航

生产并消费消息(非 SSL 方式)

最近更新时间2024.01.26 16:09:35

首次发布时间2021.08.30 16:33:47

火山引擎消息队列 RabbitMQ版兼容开源 RabbitMQ 协议,创建 RabbitMQ 实例后,您可以连接实例生产并消费消息。
关于 RabbitMQ 的使用教程,请参考 RabbitMQ 官网提供的不同语言的连接和使用向导。详细信息请查看RabbitMQ官网

说明

如果 RabbitMQ 实例开启了 SSL 认证,请参考生产并消费消息(SSL方式)连接实例并生产消费消息。

前提条件

  • 已完成开发环境准备,操作步骤请参考准备环境
  • 已经购买 RabbitMQ 实例,且实例状态为运行中。操作步骤请参考创建实例
  • 已获取 RabbitMQ 服务访问的用户名和密码,默认为创建实例时设置的管理员用户名及密码。
  • 已获取 RabbitMQ 实例详情中的AMQP接入点。操作步骤请参考查看实例信息
  • 已经为进行生产和消费的用户绑定了目标 Vhost。操作步骤请参见绑定用户和 Vhost

操作步骤

  1. 安装 Java 依赖库。

    1. 如果项目使用 Maven 构建,请在 pom.xml 文件中增加如下依赖。
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.12.0</version>
    </dependency>
    
    1. 如果项目使用 Gradle 构建,请添加如下依赖:
    compile 'com.rabbitmq:amqp-client:5.12.0'
    
  2. 连接实例并生产消息。

    说明

    为了提高稳定性和可用性,建议参考示例代码,开启客户端自动重连Publish Confirm等机制。

    package org.example.amqp.producer;
    
    import com.rabbitmq.client.*;
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.UUID;
    
    public class SimpleProducer {
        private static final String host  = "your-end-point"; // AMQP协议私网接入地址。
        private static final int port  = 5672;
        private static final String userName  = "your-user-name";
        private static final String password  = "your-password";
        private static final String vhost  = "/";
        private static final String exchangeName  = "your-exchange";
        private static final String queueName  = "your-queue";
        private static final String bindingKey  = "your-key";
        private static final int deliveryMode  = 2;
        private static final int batchSize  = 50;
        private static final int publishConfirmTimeout  = 10000; // publish confirm超时时间10秒
    
        public static void main(String[] args) throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            // 设置接入点,在RabbitMQ版控制台实例详情页面查看
            factory.setHost(host);
            // 设置端口,AMQP协议SSL加密端口5671
            factory.setPort(port);
            // 用户名,在WebUI控制台配置并管理
            factory.setUsername(userName);
            // 密码,在WebUI控制台配置并管理
            factory.setPassword(password);
            // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
            factory.setVirtualHost(vhost);
    
            // 开启自动重连,默认值为true
            factory.setAutomaticRecoveryEnabled(true);
            // 设置自动重连间隔
            factory.setNetworkRecoveryInterval(5000);
            // 设置连接建立的协商超时时间
            factory.setConnectionTimeout(20000);
    
            // 建立Connection
            Connection connection = factory.newConnection();
    
            // 创建Channel
            Channel channel = connection.createChannel();
    
            // 开启publish confirm(可选,推荐开启)
            channel.confirmSelect();
    
            // 创建Exchange(可选)
            channel.exchangeDeclare(exchangeName, "direct", true, false, false, null);
            // 创建Classic类型队列(可选)
            channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
            // 绑定Exchange和队列(可选)
            channel.queueBind(queueName, exchangeName, bindingKey);
    
            // 注册publish confirm的回调
            channel.addConfirmListener(new ConfirmListener() {
                public void handleNack(long deliveryTag, boolean multiple) {
                    // 处理nack回调
                    System.out.println("nack received: " + deliveryTag);
                }
    
                public void handleAck(long deliveryTag, boolean multiple) {
                    // 处理ack回调
                    System.out.println("ack received: " + deliveryTag);
                }
            });
    
            int msgsToSend = 10000;
    
            // 持续发送消息
            while (msgsToSend > 0) {
                // 设置消息属性
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).deliveryMode(deliveryMode).build();
                String msgBody = "hello rabbitmq";
    
                try {
                    channel.basicPublish(exchangeName, bindingKey, true, props, msgBody.getBytes(StandardCharsets.UTF_8));
    
                    msgsToSend--;
    
                    // 等待上批次发送消息的confirm
                    if (msgsToSend%batchSize  == 0) {
                        channel.waitForConfirms(publishConfirmTimeout);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            // 关闭Channel和Connection
            channel.close();
            connection.close();
        }
    }
    
  3. 连接实例并消费消息。

    package org.example.amqp.consumer;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.util.HashMap;
    import java.util.concurrent.TimeoutException;
    
    public class SimpleConsumer {
        private static final String host  = "your-end-point"; // AMQP协议私网接入地址。
        private static final int port  = 5672;
        private static final String userName  = "your-user-name";
        private static final String password  = "your-password";
        private static final String vhost  = "/";
        private static final String queueName  = "your-queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, KeyManagementException, NoSuchAlgorithmException {
            ConnectionFactory factory = new ConnectionFactory();
            // 设置接入点,在RabbitMQ版控制台实例详情页面查看
            factory.setHost(host);
            // 设置端口,AMQP协议默认端口5672
            factory.setPort(port);
            // 用户名,在WebUI控制台配置并管理
            factory.setUsername(userName);
            // 密码,在WebUI控制台配置并管理
            factory.setPassword(password);
            // 设置Vhost名称,请确保已在RabbitMQ版控制台上创建完成。
            factory.setVirtualHost(vhost);
    
            // 建立Connection
            Connection connection = factory.newConnection();
            // 创建Channel
            Channel channel = connection.createChannel();
    
            // 创建Classic类型队列(非必须,如果已在WebUI控制台创建可跳过)
            channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
    
            // 启动消费
            consume(connection);
    
            // 关闭Channel和Connection
            channel.close();
            connection.close();
        }
    
        private static void consume(Connection connection) throws IOException {
            final Channel channel = connection.createChannel();
    
            channel.basicConsume(queueName, true, "test_consumer", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    // 处理收到的消息
                    System.out.printf("Received Message: %s, consumerTag: %s, deliveryTag: %s, messageId: %s\n", new String(body, StandardCharsets.UTF_8),
                            consumerTag, envelope.getDeliveryTag(), properties.getMessageId());
                }
            });
        }
    }