RabbitMQ基础入门(上)

2023年 7月 17日 56.2k 0

说实话,看视频来来回回学习了RabbitMQ也有两遍了,也有跟着敲代码,不过每次都是过了不久就忘记了如何使用,今天又来复习RabbitMQ的使用了,这次我跟着官方文档一个一个模型来,我不信我还能忘记T.T

AMQP协议

Advance Message Queue Protocol 高级消息队列协议。RabbitMQ就是根据这个协议开发的。
AMQP协议主要由下面几个部分组成,十分清晰
图片.png

左边Publisher,消息的生产者,负责发送信息。最右边Consumer,消息的消费者,负责接收消息。而在正方形框框中则是RabbitMQ承载的功能。

  • 生产者:发送消息到某个交换机。
  • 消费者:从某个队列中取消息。
  • 交换机(Exchange):类似家里的路由器,负责将消息发送到对应的队列中
  • 队列(Queue):存放消息的地方。
  • 路由(Routes):交换机根据路由将消息发送到队列的路由规则

在Java中引入RabbitMQ客户端

  
    com.rabbitmq  
    amqp-client  
    5.8.0  

RabbitMqUtils

我使用的是自己云服务器的RabbitMq,下面是一些配置项,这里按照自己的设置。这里反复使用创建channel我就做成了工具类。

这里的channel可以理解为一个一个连接。保留这些连接当然是为了复用。

public class RabbitMqUtils {  
    public static Channel getChannel() throws IOException, TimeoutException {  
        //创建一个连接工厂  
        ConnectionFactory factory = new ConnectionFactory();  
        //连接rabbitmq队列 
        //ip地址
        factory.setHost("xxxx");  
        //用户名  
        factory.setUsername("xxx");  
        //密码  
        factory.setPassword("xxx");  
        //设置虚拟主机  (这里默认是 / )自己可以创建一个
        factory.setVirtualHost("xxx");  
        //从连接工厂里获取一个连接  
        Connection connection = factory.newConnection();  
        //在连接中创建一个信道  
        Channel channel = connection.createChannel();  
        return channel;  
    }  
}

HelloWorld

图片.png

使用默认的交换机,只有一个生产者和一个消费者,外加一个队列

生产者代码

public class Send {  
private final static String QUEUE_NAME = "hello";  
    public static void main(String[] argv) throws Exception {  
        Channel channel = RabbitMqUtils.getChannel(); 
        /声明队列,不存在则创建
        //参数:队列名称,是否持久化,非排他,非自动删除,其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        String message = "Hello World!";  
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'");  
    }  
}

消费者代码

public class Recv {  
//声明队列  
private final static String QUEUE_NAME = "hello";  
    public static void main(String[] argv) throws Exception {  
        Channel channel = RabbitMqUtils.getChannel();
        /声明队列,不存在则创建
        //参数:队列名称,是否持久化,非排他,非自动删除,其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
        //消息接收到时调用的方法
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), "UTF-8");  
            System.out.println(" [x] Received '" + message + "'");  
        };  
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });  
    }  
}

先启动生产者,再启动消费者

结果

图片.png
图片.png

这些方法参数有时候只是简单的变量提示,因为这些源码是经过压缩的,这个时候我们就可以点进方法内部,点击下载源码。这样提示就完整了。

Work Queues

图片.png

工作队列模式
一个生产者,多个消费者。
生产者

public class NewTask {  
    private static final String TASK_QUEUE_NAME = "task_queue";  

    public static void main(String[] argv) throws Exception {  
        Channel channel = RabbitMqUtils.getChannel();  
        /声明队列,不存在则创建
        //参数:队列名称,是否持久化,非排他,非自动删除,其他参数
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);  

        String message = String.join(" ", argv);  

        channel.basicPublish("", TASK_QUEUE_NAME,  
        MessageProperties.PERSISTENT_TEXT_PLAIN,  
        message.getBytes("UTF-8"));  
        System.out.println(" [x] Sent '" + message + "'");  
    }  
}

//durable 设置为 true 可以将队列持久化,重启之后依旧存在该队列

消费者

public class Worker {  
  
    private static final String TASK_QUEUE_NAME = "task_queue";  

    public static void main(String[] argv) throws Exception {  

        Channel channel = RabbitMqUtils.getChannel();  
        //声明队列,不存在则创建
        //参数:队列名称,是否持久化,非排他,非自动删除,其他参数
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
        //设置预取数为1,这样RabbitMQ就会在给消费者新消息之前先等待消息被确认
        //channel.basicQos(1);  

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), "UTF-8");  

            System.out.println(" [x] Received '" + message + "'");  
            try {  
                doWork(message);  
            } finally {  
                System.out.println(" [x] Done");  
                //消息id,是否批量应答消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  
            }  
        };  
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });  
    }  

    private static void doWork(String task) {  
        for (char ch : task.toCharArray()) {  
            if (ch == '.') {  
                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException _ignored) {  
                    Thread.currentThread().interrupt();  
                }  
            }  
        }  
    }  
}

结果

图片.png
图片.png

Publish/Subscribe

发布订阅模式

图片.png

可以看到该图多了个X,这个就是交换机。那之前两种为什么没有交换机呢?那其实他们两个用的是默认交换机。
这种模式适用于什么场景?可以用于订阅日志。我们来看看官方的代码
生产者

public class EmitLog {  
  
    private static final String EXCHANGE_NAME = "logs";  

    public static void main(String[] argv) throws Exception {  
        Channel channel = RabbitMqUtils.getChannel();  
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
        String message = argv.length < 1 ? "info: Hello World!" :  
        String.join(" ", argv);  
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));  
        System.out.println(" [x] Sent '" + message + "'");  
    }  
}

消费者

public class ReceiveLogs {  
    private static final String EXCHANGE_NAME = "logs";  

    public static void main(String[] argv) throws Exception {  

        Channel channel = RabbitMqUtils.getChannel();  

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
        String queueName = channel.queueDeclare().getQueue();  
        channel.queueBind(queueName, EXCHANGE_NAME, "");  

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {  
            String message = new String(delivery.getBody(), "UTF-8");  
            System.out.println(" [x] Received '" + message + "'");  
        };  
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });  
    }  
}

这里利用了一个叫做fanout(扇出、广播)的交换机。只要队列绑定了这个交换机,那么fanout会将消息发送给每个队列一份。

注意:这里要先开启消费者,再开启生产者。

结果

图片.png
图片.png

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论