上一篇
地址: .Net RabbitMQ - 掘金 (juejin.cn)
由于所用包需要商业许可证
,换一种方法
环境搭建
MQ 安装
使用的是Podman, 环境搭建参考: Podman 基本使用 - 掘金 (juejin.cn)
依次执行:
点击 RabbitMQ Management,进入RabbitMQ, 账号密码都是guest
.NET 依赖下载
RabbitMQ.Client
核心代码
项目目录
发送者
using RabbitMQ.Client;
using System.Text;
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5673;
factory.DispatchConsumersAsync = true;
IConnection connection = factory.CreateConnection();
string exchangeName = "exchange1";
string directName = "direct";
string routingKey = "key1";
while (true)
{
using var channel = connection.CreateModel(); // 创建信道
IBasicProperties prop = channel.CreateBasicProperties();
prop.DeliveryMode = 2;
channel.ExchangeDeclare(exchangeName, directName); // 声明交换机
byte[] bytes = Encoding.UTF8.GetBytes(DateTime.Now.ToString());
channel.BasicPublish(
exchangeName ,
routingKey: routingKey,
mandatory: true,
body: bytes
);
Console.WriteLine("ok" + DateTime.Now.ToString());
Thread.Sleep(1000);
}
接收者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5673;
factory.DispatchConsumersAsync = true;
IConnection connection = factory.CreateConnection();
string exchangeName = "exchange1";
string queueName = "queue1";
string routerKey = "key1";
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchangeName, "direct");
channel.QueueDeclare(
queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.QueueBind(queueName, exchangeName, routerKey);
AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs _event)
{
try
{
byte[] bytes = _event.Body.ToArray();
string text = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now + ", 收到消息:" + text);
channel.BasicAck(_event.DeliveryTag, multiple: false);
await Task.Delay(1000);
}
catch (Exception ex)
{
channel.BasicAck(_event.DeliveryTag, multiple: true); // 失败重发
}
};
测试