.NET RabbitMQ

上一篇

地址: .Net RabbitMQ - 掘金 (juejin.cn)

由于所用包需要商业许可证,换一种方法

环境搭建

MQ 安装

使用的是Podman, 环境搭建参考: Podman 基本使用 - 掘金 (juejin.cn)

image.png

依次执行:

  • podman pull rabbitmq
  • podman run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq
  • podman ps -a
  • podman exec -it 8200f0b845fd /bin/bash
  • rabbitmq-plugins enable rabbitmq_management
  • 点击 RabbitMQ Management,进入RabbitMQ, 账号密码都是guest

    image.png

    .NET 依赖下载

    RabbitMQ.Client

    image.png

    核心代码

    项目目录

    image.png

    发送者

    using RabbitMQ.Client;
    using System.Text;
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5673;
    factory.DispatchConsumersAsync = true;
    IConnection connection = factory.CreateConnection();
    string exchangeName = "exchange1";
    string directName = "direct";
    string routingKey = "key1";
    while (true)
    {
    using var channel = connection.CreateModel(); // 创建信道
    IBasicProperties prop = channel.CreateBasicProperties();
    prop.DeliveryMode = 2;
    channel.ExchangeDeclare(exchangeName, directName); // 声明交换机
    byte[] bytes = Encoding.UTF8.GetBytes(DateTime.Now.ToString());
    channel.BasicPublish(
    exchangeName ,
    routingKey: routingKey,
    mandatory: true,
    body: bytes
    );
    Console.WriteLine("ok" + DateTime.Now.ToString());
    Thread.Sleep(1000);
    }

    接收者

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.Port = 5673;
    factory.DispatchConsumersAsync = true;
    IConnection connection = factory.CreateConnection();
    string exchangeName = "exchange1";
    string queueName = "queue1";
    string routerKey = "key1";
    using IModel channel = connection.CreateModel();
    channel.ExchangeDeclare(exchangeName, "direct");
    channel.QueueDeclare(
    queueName,
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: null
    );
    channel.QueueBind(queueName, exchangeName, routerKey);
    AsyncEventingBasicConsumer consumer = new AsyncEventingBasicConsumer(channel);
    consumer.Received += Consumer_Received;
    channel.BasicConsume(queueName, autoAck: false, consumer: consumer);
    Console.ReadLine();
    async Task Consumer_Received(object sender, BasicDeliverEventArgs _event)
    {
    try
    {
    byte[] bytes = _event.Body.ToArray();
    string text = Encoding.UTF8.GetString(bytes);
    Console.WriteLine(DateTime.Now + ", 收到消息:" + text);
    channel.BasicAck(_event.DeliveryTag, multiple: false);
    await Task.Delay(1000);
    }
    catch (Exception ex)
    {
    channel.BasicAck(_event.DeliveryTag, multiple: true); // 失败重发
    }
    };

    测试

    20231001_113137.gif