.NET RabbitMQ
上一篇
地址: .Net RabbitMQ - 掘金 (juejin.cn)
由于所用包需要商业许可证
,换一种方法
环境搭建
MQ 安装
使用的是Podman, 环境搭建参考: Podman 基本使用 - 掘金 (juejin.cn)
依次执行:
点击 RabbitMQ Management,进入RabbitMQ, 账号密码都是guest
.NET 依赖下载
RabbitMQ.Client
核心代码
项目目录
发送者
using RabbitMQ.Client; using System.Text; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5673; factory.DispatchConsumersAsync = true; IConnection connection = factory.CreateConnection(); string exchangeName = "exchange1"; string directName = "direct"; string routingKey = "key1"; while (true) { using var channel = connection.CreateModel(); // 创建信道 IBasicProperties prop = channel.CreateBasicProperties(); prop.DeliveryMode = 2; channel.ExchangeDeclare(exchangeName, directName); // 声明交换机 byte[] bytes = Encoding.UTF8.GetBytes(DateTime.Now.ToString()); channel.BasicPublish( exchangeName , routingKey: routingKey, mandatory: true, body: bytes ); Console.WriteLine("ok" + DateTime.Now.ToString()); Thread.Sleep(1000); }
接收者
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5673; factory.DispatchConsumersAsync = true; IConnection connection = factory.CreateConnection(); string exchangeName = "exchange1"; string queueName = "queue1"; string routerKey = "key1"; using IModel channel = connection.CreateModel(); channel.ExchangeDeclare(exchangeName, "direct"); channel.QueueDeclare( queueName, durable: true, exclusive: false, autoDelete: false, arguments: null ); channel.QueueBind(queueName, exchangeName, routerKey); AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += Consumer_Received; channel.BasicConsume(queueName, autoAck: false, consumer: consumer); Console.ReadLine(); async Task Consumer_Received(object sender, BasicDeliverEventArgs _event) { try { byte[] bytes = _event.Body.ToArray(); string text = Encoding.UTF8.GetString(bytes); Console.WriteLine(DateTime.Now + ", 收到消息:" + text); channel.BasicAck(_event.DeliveryTag, multiple: false); await Task.Delay(1000); } catch (Exception ex) { channel.BasicAck(_event.DeliveryTag, multiple: true); // 失败重发 } };
测试