RocketMQ 五种消息类型实践

2023年 7月 16日 47.7k 0

1.五种消息类型介绍

普通消息:普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序, 但是生产消费都是并行进行的,单机性能可达十万级别的TPS。

分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保 存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。

全局有序消息:如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息 就是单分区,所有消息都遵循FIFO(先进先出)的原则。

延迟消息:消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消 费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中只需要在发送消息时设置延迟级别即可实现。

事务消息:主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败 时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交 (Commit)消息或者回滚(Rollback)消息优雅地实现分布式事务。

消息类型 优点 缺点 备注
普通消息(并发消息) 性能最好 消息的生产和消费都是无序 大部分场景适用
分区有序消息 单分区中消息有序,单机发送TPS万级别 单点问题。如果Broker宕机,则会导致发送失败 大部分有序消息场景适用
全局有序消息 类似传统的Queue,全部消息有序,单机发送TPS千级别 单点问题。如果Broker宕机,则会导致发送失败 极少场景使用
延迟消息 RocketMQ自身支持,不需要额外使用组件,支持延迟特性 不能根据任意时间延迟,使用范围受限。Broker随着延迟级别增大支持越多,CPU压力越大延迟时间不准确 非精确、延迟级别不多的场景,非常方便使用
事务消息 Rocket MQ自身支持,不需要额外使用组件支持事务特性 RocketMQ事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效 简单事务处理可以使用

2.普通消息

//普通消息类型
@Slf4j
public class MessageType1 {
    public static void main(String[] args) {
        //DefaultMQProducer用于发送非事务消息
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        //注册NameServer地址
        producer.setNamesrvAddr("172.16.247.3:9876");
        //异步发送失败后Producer自动重试2次
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            //启动生产者实例
            producer.start();
            //消息数据
            String data = "{"title":"2022年第四季度汇总数据"}";
            //消息主题
            Message message = new Message("tax-data", "2022S4", "3333", data.getBytes());
            //发送结果
            SendResult result = producer.send(message);
            log.info("Broker响应:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                //关闭连接
                producer.shutdown();
                log.info("连接已关闭");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

代码返回结果:

16:58:40.090 [main] INFO com.fblinux.rocketmq.mtype.MessageType1 - Broker响应:
SendResult [
	sendStatus=SEND_OK, 
	msgId=7F000001AA3018B4AAC231FDDA4C0000, 
	offsetMsgId=AC10F70300002A9F0000000000234061, 
	messageQueue=MessageQueue [topic=tax-data, brokerName=broker-a, queueId=13], 
	queueOffset=625
]

返回结果解析:

sendStatus:发送状态,SEND_OK代表成功

msgId:消息由RocketMQ分配的全局唯一Id,由 producer客户端生成,调用方法MessageClientIDSetter.createUniqID()生成全局唯一的Id

offsetMsgId:Broker 服务端将消息追加到内存后会返回其物理偏移量,即在 commitlog 文件中的偏移量,然后会生成一个Id

messageQueue:消息队列内容

  • topic:主题名称
  • brokerName:broker服务器名字,在RocketMQ xxx.propertites配置文件中brokerName项定义
  • queueId:queueId队列Id,默认会初始化4个(0-3)

queueOffset:queueId对应队列逻辑上的位置(偏移量)

3.有序消息

假设没有有序消息时有什么问题?

如果某一笔业务分为多条普通消息同时发送,消费者无法保证按生产者预期的顺序进行消费, 进而导致代码逻辑错误。

举例:一个电商下单流程,分为创建订单、扣减库存、加积分。要是消费者无序消费就可能是先扣减库存发货,再给用户加积分,最后在创建订单扣款,导致业务逻辑出现问题。

3.1.分区有序消息

分区有序消息:与Kafka中的分区类似,把一个Topic消息分为多个分区“保存”和消费,在一个分区内的消息就是传统的队列,遵循FIFO(先进先出)原则。

全局有序消息:如果把一个 Topic 的分区数设置为 1,那么该 Topic 中的消息就是单分区,所有消息都遵循FIFO(先进先出)的原则。

image-20221011101936624

Consumer消费时通过一个分区只能有一个线程消费的方式来保证消息顺序。

RocketMQ 有序消息需要两点调整:

  • 生产者端要求按id等唯一标识分配消息队列
  • 消费者端采用专用的监听器保证对队列的单线程应用

发送分区顺序消息

@Slf4j
//发送分区顺序消息
public class MessageType2 {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.31.103:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            producer.start();
            Integer id = 4465;
            String data = "{"id":" + id + " , + "title":"2022年第四季度汇总数据"}";
            Message message = new Message("tax-data", "2022S4", id.toString(), data.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //分区有序消息最大的区别便是调用send方法时,需要实现MessageQueueSelector接口,确定使用哪个队列投递消息
            SendResult result = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    log.info("当前队列数量:" + mqs.size() + ",明细:" + mqs.toString());
                    log.info("Message对象:" + msg.toString());
                    int dataId = Integer.parseInt(msg.getKeys());
                    int index = dataId % mqs.size();
                    MessageQueue messageQueue = mqs.get(index);
                    log.info("分区队列:" + messageQueue);
                    return messageQueue;
                }
            }, null);

            log.info("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                producer.shutdown();
                System.out.println("连接已关闭");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

3.2.全局有序消息

在实现MessageQueueSelector接口时,固定选择某个队列就代表全局有序。注意:这里的全局有序 代表broker中全局有序。如果消息被分发到不同的broker中,不保证有序,当然这种使用方法是错误的。

3.3.消费者端调整

面对有序消息场景,消费者端最大的变化是registerMessageListener监听器要实例化MessageListenerOrderly对象,用于为每一个队列分配唯一的连接(线程)进行消费。

4.延迟消息

使用起来不灵活,内部规定了18个延时队列

延迟消息是指消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。 在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在 RocketMQ中 只需要在发送消息时设置延迟级别即可实现。

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 

Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个 数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。

4.1.底层原理

1、修改消息Topic名称和队列信息

RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之 后将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。

2、转发消息到延迟主题的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息 进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

3、延迟服务消费SCHEDULE_TOPIC_XXXX消息

Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

4、将信息重新存储到CommitLog中

在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储

5、将消息投递到目标Topic中

这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。

6、消费者消费目标topic中的数据

4.2.代码示例

//发送延迟消息
public class MessageType3 {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("172.16.247.3:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        try {
            producer.start();
            long id = 4466L;

            String data = "{"id":" + id + " , + "title":"2022年第四季度汇总数据"}";
            Message message = new Message("tax-data", "2022S4", data.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            message.setDelayTimeLevel(3);
            SendResult result = producer.send(message);
            System.out.println("消息已发送:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                producer.shutdown();
                System.out.println("连接已关闭");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

5.事务消息

5.1.事物消息解决的问题

先看一个工作场景,

订单ID1030被创建后要保存到数据库,同时该1030订单通过MQ投 递给其他系统进行消费。如果要保证订单数据入库与消息投递状态要保证最终一致,要怎么做?

这里有两种常见做法:

第一种,先写库,再发送数据

//伪代码
//插⼊1030号订单
orderDao.insert(1030,order);
//向1030号订单新增3条订单明细,10081-10083,
orderDetailDao.insert(10081,1030,orderDetail1);
orderDetailDao.insert(10082,1030,orderDetail2);
orderDetailDao.insert(10083,1030,orderDetail3);
//向MQ发送数据,如果数据发送失败
SendResult result = producer.send(orderMessage)
if(result.getState().equals("SEND_OK"))){
		connection.commit();
	}else{
		connection.rollback();
}

如果⽣产者发送消息时,因为⽹络原因导致10秒消息才返回SendResult结果,这就意味这10秒内数据 库事务⽆法提交,⼤量并发下,数据库连接资源会在这10秒内迅速耗尽,后续请求进⼊连接池等待状 态,最终导致系统停⽌响应。

第二种,先发消息,在写库

//伪代码 
//向MQ发送数据,如果数据发送失败 
SendResult result = producer.send(orderMessage) 
if(result.getState().equals("SEND_OK"))){ 
		//插入1030号订单 
		orderDao.insert(1030,order); 
		//向1030号订单新增3条订单明细,10081-10083, 
		orderDetailDao.insert(10081,1030,orderDetail1); 
		orderDetailDao.insert(10082,1030,orderDetail2); 
		orderDetailDao.insert(10083,1030,orderDetail3); 
		connection.commit; 
}

问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为1030订单自动设 置了“快递信息”,可是如果后续orderDao向数据库插入数据产生异常导致业务失败。我们还需 要再次发送“取消1030订单”的消息把下游1030订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。

那有没有什么方式可以既不阻塞数据库事务,也能保证最终一致性呢?有,RocketMQ提供了事务消息可以保障应用本地事务与MQ最终一致性。

5.2.代码实现

发出事务消息代码:

@Data
@Slf4j
public class MessageType4 {
    public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

        //事务消息一定要使用TransactionMQProducer事务生产者创建
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name1");
        //从NameServer获取配置数据
        producer.setNamesrvAddr("172.16.247.3:9876");

        //CachedThreadPool线程池用于回查事务数据
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("check-transaction-thread");
                return thread;
            }
        });
        //将生产者与线程池绑定
        producer.setExecutorService(cachedThreadPool);
        //绑定事务监听器,用于执行代码
        TransactionListener transactionListener = new OrderTransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        //启动生产者
        producer.start();

        //创建消息对象
        Message msg =
                new Message("order", "order-1030",
                        "1030", "1030订单与明细的完整JSON数据(略)".getBytes());
        //一定要调用sendMessageInTransaction发送事务消息
        //参数1:消息对象
        //参数2:其他参数,目前用不到
        producer.sendMessageInTransaction(msg, null);
    }
}

处理本地事务业务代码:

@Data
@Slf4j
public class OrderTransactionListenerImpl implements TransactionListener {

    @Override
    //执行本地事务代码
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("正在执行本地事务,订单编号:" + msg.getKeys());
        /* 伪代码
        try{
            //插入1030号订单
            orderDao.insert(1030,order);
            //向1030号订单新增3条订单明细,10081-10083,
            orderDetailDao.insert(10081,1030,orderDetail1);
            orderDetailDao.insert(10082,1030,orderDetail2);
            orderDetailDao.insert(10083,1030,orderDetail3);
            connection.commit();
            //返回Commit,消费者可以消费1030订单消息
            return LocalTransactionState.COMMIT_MESSAGE;
        }catch(Exception e){
            //返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息
            connection.rollback();
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        */
        log.info("模拟网络中断,Broker并未收到生产者本地事务状态回执,返回UNKNOW");
        return LocalTransactionState.UNKNOW;
    }

    @Override
    //回查本地事务处理状态
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        String keys = msg.getKeys();
        log.info("触发回查,正在检查" + keys + "订单状态");
        /* 伪代码

        Order order = orderDao.selectById(1030);
        if(order != null){
            //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
            return LocalTransactionState.COMMIT_MESSAGE;
        }else{
            //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        */
        log.info("回查结果," + keys + "订单已入库,发送Commit指令");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

5.3.实验了解RocketMQ事务执行过程

5.3.1.标准流程:

1、producer.sendMessageInTransaction(msg, null); 执行成功

此时1030订单消息已被发送到MQ服务器(Broker),不过该消息在Broker此时状态为“half-message”,相当于存储在MQ中的“临时消息”,此状态下消息无法被投递给消费者。

image-20221010200010635

2、生产者发送消息成功后自动触发

OrderTransactionListenerImpl.executeLocalTransaction()执行本地事务。

当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交commit,同executeLocalTransaction方法返回COMMIT_MESSAGE,生产者会再次向MQ服务器发送一个commit提交消息,此前在Broker中保存1030订单消息状态就从“half-message”变为”已提交”,broker将消息发给下游的消费者处理。

image-20221010200109793

5.3.2.异常流程1:

producer.sendMessageInTransaction(msg, null); 执行失败,抛出异常

此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。

image-20221010200246710

5.3.3.异常流程2:

producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行失败

此时本地事务执行rollback回滚,数据库数据被撤销,同时executeLocalTransaction方法返 回ROLLBACK_MESSAGE代表回滚,生产者会再次向MQ服务器发送一个rollback回滚消息,此前在Broker中保存1030订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与MQ消息一致。

image-20221010200400217

5.3.4.异常流程3:

producer.sendMessageInTransaction(msg, null); 执行成功,本地事务执行成功,但给Broker返回Commit消息时断网了,导致broker无法收到提交指令。

此时本地数据库订单数据已入库,但MQ因为断网无法收到生产者的发来的“commit”消息,1030订单数据一直处于“half message”的状态,消息无法被投递到消费者,本地事务与MQ消息的一致性被破坏。

image-20221010200506940

RocketMQ为了解决这个问题,设计了回查机制,对于broker中的half message,每过一小段时间 就自动尝试与生产者通信,试图调用通

public LocalTransactionState checkLocalTransaction(MessageExt msg) {

    String keys = msg.getKeys();
    log.info("触发回查,正在检查" + keys + "订单状态");
    /* 伪代码

    Order order = orderDao.selectById(1030);
    if(order != null){
        //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
        return LocalTransactionState.COMMIT_MESSAGE;
    }else{
        //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直接将数据删除,消费者不能收到1030订单消息
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    */
    log.info("回查结果," + keys + "订单已入库,发送Commit指令");
    return LocalTransactionState.COMMIT_MESSAGE;
}

由MQ服务器主动发起,生产者调用OrderTransactionListenerImpl.checkLocalTransaction()检查之前数据库事务是否完成。

image-20221010200707773

checkLocalTransaction()查询到订单数据,说明之前的数据库事务已经完成,返回 COMMIT_MESSAGE,这样Broker中的1030订单消息就可以被发送给消费者进行处理。

image-20221010200757462

checkLocalTransaction()未查询到订单数据,说明之前的数据库事务没有处理成功,返回ROLLBACK_MESSAGE,这样Broker中的1030订单消息就会被删除。

image-20221010200815722

5.4.RocketMQ 事务消息执行流程

 

image-20221010201158898

 

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论