说实话,看视频来来回回学习了RabbitMQ也有两遍了,也有跟着敲代码,不过每次都是过了不久就忘记了如何使用,今天又来复习RabbitMQ的使用了,这次我跟着官方文档一个一个模型来,我不信我还能忘记T.T
AMQP协议
Advance Message Queue Protocol 高级消息队列协议。RabbitMQ就是根据这个协议开发的。
AMQP协议主要由下面几个部分组成,十分清晰
左边Publisher,消息的生产者,负责发送信息。最右边Consumer,消息的消费者,负责接收消息。而在正方形框框中则是RabbitMQ承载的功能。
- 生产者:发送消息到某个交换机。
- 消费者:从某个队列中取消息。
- 交换机(Exchange):类似家里的路由器,负责将消息发送到对应的队列中
- 队列(Queue):存放消息的地方。
- 路由(Routes):交换机根据路由将消息发送到队列的路由规则
在Java中引入RabbitMQ客户端
com.rabbitmq
amqp-client
5.8.0
RabbitMqUtils
我使用的是自己云服务器的RabbitMq,下面是一些配置项,这里按照自己的设置。这里反复使用创建channel我就做成了工具类。
这里的channel可以理解为一个一个连接。保留这些连接当然是为了复用。
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitmq队列
//ip地址
factory.setHost("xxxx");
//用户名
factory.setUsername("xxx");
//密码
factory.setPassword("xxx");
//设置虚拟主机 (这里默认是 / )自己可以创建一个
factory.setVirtualHost("xxx");
//从连接工厂里获取一个连接
Connection connection = factory.newConnection();
//在连接中创建一个信道
Channel channel = connection.createChannel();
return channel;
}
}
HelloWorld
使用默认的交换机,只有一个生产者和一个消费者,外加一个队列
生产者代码
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/声明队列,不存在则创建
//参数:队列名称,是否持久化,非排他,非自动删除,其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
消费者代码
public class Recv {
//声明队列
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/声明队列,不存在则创建
//参数:队列名称,是否持久化,非排他,非自动删除,其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//消息接收到时调用的方法
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
先启动生产者,再启动消费者
结果
这些方法参数有时候只是简单的变量提示,因为这些源码是经过压缩的,这个时候我们就可以点进方法内部,点击下载源码。这样提示就完整了。
Work Queues
工作队列模式
一个生产者,多个消费者。
生产者
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/声明队列,不存在则创建
//参数:队列名称,是否持久化,非排他,非自动删除,其他参数
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
//durable 设置为 true 可以将队列持久化,重启之后依旧存在该队列
消费者
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//声明队列,不存在则创建
//参数:队列名称,是否持久化,非排他,非自动删除,其他参数
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//设置预取数为1,这样RabbitMQ就会在给消费者新消息之前先等待消息被确认
//channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
//消息id,是否批量应答消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
结果
Publish/Subscribe
发布订阅模式
可以看到该图多了个X,这个就是交换机。那之前两种为什么没有交换机呢?那其实他们两个用的是默认交换机。
这种模式适用于什么场景?可以用于订阅日志。我们来看看官方的代码
生产者
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
消费者
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
这里利用了一个叫做fanout(扇出、广播)的交换机。只要队列绑定了这个交换机,那么fanout会将消息发送给每个队列一份。
注意:这里要先开启消费者,再开启生产者。
结果