1. 前言
大家好,我是路由器没有路。
随着互联网的发展,越来越多的应用程序需要处理大量的消息。
为了提高应用程序的性能和可扩展性,很多应用程序开始采用消息队列(MQ)来处理消息。
MQ 可以将消息异步地发送到目的地,从而实现解耦、异步处理和流量控制等功能。
但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。
本文将介绍 MQ 如何保证消息不被重复消费,并讨论如何保证消息消费的幂等性。
2. 为什么会产生重复消费的问题?
在回答“如何保证消息不被重复消费?”这个问题前,首先需要了解消息队列可能会出现的重复消费问题。
比如 RabbitMQ、RocketMQ、Kafka 等常用的消息队列都可能会出现这个问题,因为这个问题通常是由我们开发人员来保证的,而不是由消息队列本身来保证的。
以 Kafka 为例,它有一个 offset
的概念,每个消息写入时都有一个 offset
,代表消息的序号。
当消费者消费数据时,每隔一段时间会将自己消费过的消息的 offset
提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset
继续消费吧”。
但是,有时候可能会出现一些意外情况,比如重启系统时直接 kill 进程,导致消费者有些消息处理了但是没有来得及提交 offset
,重启之后就会出现少数消息被重复消费的情况。
总的来说,消息队列(MQ)中产生重复消费的问题,主要是由于以下原因:
-
消费者异常关闭:当消费者异常关闭时,可能会导致已经消费过的消息没有被确认,从而出现重复消费的问题。
-
网络故障:当网络出现故障时,可能会导致消息没有被正确地发送到消费者端,从而出现重复消费的问题。
-
消费者处理消息失败:当消费者处理消息失败时,可能会导致消息没有被确认,从而出现重复消费的问题。
为了避免这些问题,我们需要采取一些措施来保证消息的可靠性,例如手动确认消息、消费者自身保证幂等性等。
3. 如何保证消息不被重复消费?
在 MQ 中,消息的消费是异步的,消费者需要从队列中获取消息并进行处理。
但是,在一些情况下,消息可能会被重复消费,例如网络故障、消费者异常关闭等情况。为了避免消息被重复消费,MQ 通常采用以下两种方式来保证消息不被重复消费:
3.1 消费者手动确认
在 MQ 中,消费者可以通过手动确认的方式来保证消息不被重复消费。
当消费者从队列中获取到消息后,需要手动确认消息已经被处理。
如果消费者没有确认消息,MQ 会将消息重新发送给其他消费者进行处理。
通过这种方式,可以保证消息不会被重复消费。
3.2 消息去重
在 MQ 中,还可以采用消息去重的方式来保证消息不被重复消费。
具体而言,可以在消费者端对消息进行去重,如果发现消息已经被处理过,就不再进行处理。
为了实现消息去重,可以使用一些技术,如布隆过滤器、数据库等。通过这种方式,可以保证消息不会被重复消费。
举个例子:
假设你有一个系统,需要将消息插入数据库中。如果同一条消息被重复消费了两次,就会在数据库中插入两条相同的数据,导致数据不准确。
但是,如果在消费消息时,先判断该消息是否已经被消费过,如果已经被消费过,就直接丢弃该消息,这样就可以保留一条正确的数据,从而保证数据的正确性。
4. 如何保证消息消费的幂等性?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。
幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
针对幂等性这个问题,我前面写过一篇文章《在工作中,接口幂等该如何做?》,可以回顾下。
我们也需要结合业务需求来思考解决方案。以下是几个可能的思路:
-
如果需要将数据写入数据库,可以先根据主键查询一下,如果该数据已经存在,就不需要再插入一条新的数据了,而是直接进行更新操作。
-
如果需要将数据写入 Redis 中,这个问题就比较简单了,因为 Redis 的 set 操作天然具有幂等性,即多次执行同样的 set 操作,只会产生一个相同的结果,不会产生重复数据。
-
如果不是上述两个场景,可能需要进行一些较为复杂的处理。可以在生产者发送每条数据的时候,加上一个全局唯一的 ID,例如订单 ID 等。在消费者消费消息时,可以先根据这个 ID 到 Redis 中查询一下,该消息是否已经被消费过。如果该消息没有被消费过,就进行处理,并将这个 ID 写入 Redis 中,表示该消息已经被消费过了。如果该消息已经被消费过了,就不需要再进行处理了,保证不会重复处理相同的消息。
-
另外一种解决方案是,基于数据库的唯一键来保证重复数据不会被插入多条。由于有唯一键的约束,重复数据插入时只会报错,而不会导致数据库中出现脏数据。这种方法需要在数据库中设置唯一键约束,从而保证数据的准确性。
总结
MQ 是一种常用的消息处理技术,可以帮助应用程序实现解耦、异步处理和流量控制等功能。
但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。
为了解决这些问题,可以采用消费者手动确认、消息去重、消费者自身保证幂等性和 MQ 提供幂等性保证等方式。
通过这些方式,可以保证消息的可靠处理,提高系统的稳定性和可用性。