如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

2023年 8月 21日 33.4k 0

1. 前言

大家好,我是路由器没有路。

随着互联网的发展,越来越多的应用程序需要处理大量的消息。

为了提高应用程序的性能和可扩展性,很多应用程序开始采用消息队列(MQ)来处理消息。

MQ 可以将消息异步地发送到目的地,从而实现解耦、异步处理和流量控制等功能。

但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。

本文将介绍 MQ 如何保证消息不被重复消费,并讨论如何保证消息消费的幂等性。

2. 为什么会产生重复消费的问题?

在回答“如何保证消息不被重复消费?”这个问题前,首先需要了解消息队列可能会出现的重复消费问题。

比如 RabbitMQ、RocketMQ、Kafka 等常用的消息队列都可能会出现这个问题,因为这个问题通常是由我们开发人员来保证的,而不是由消息队列本身来保证的。

以 Kafka 为例,它有一个 offset 的概念,每个消息写入时都有一个 offset,代表消息的序号。

当消费者消费数据时,每隔一段时间会将自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 继续消费吧”。

但是,有时候可能会出现一些意外情况,比如重启系统时直接 kill 进程,导致消费者有些消息处理了但是没有来得及提交 offset,重启之后就会出现少数消息被重复消费的情况。

总的来说,消息队列(MQ)中产生重复消费的问题,主要是由于以下原因:

  • 消费者异常关闭:当消费者异常关闭时,可能会导致已经消费过的消息没有被确认,从而出现重复消费的问题。

  • 网络故障:当网络出现故障时,可能会导致消息没有被正确地发送到消费者端,从而出现重复消费的问题。

  • 消费者处理消息失败:当消费者处理消息失败时,可能会导致消息没有被确认,从而出现重复消费的问题。

为了避免这些问题,我们需要采取一些措施来保证消息的可靠性,例如手动确认消息、消费者自身保证幂等性等。

3. 如何保证消息不被重复消费?

在 MQ 中,消息的消费是异步的,消费者需要从队列中获取消息并进行处理。

但是,在一些情况下,消息可能会被重复消费,例如网络故障、消费者异常关闭等情况。为了避免消息被重复消费,MQ 通常采用以下两种方式来保证消息不被重复消费:

3.1 消费者手动确认

在 MQ 中,消费者可以通过手动确认的方式来保证消息不被重复消费。

当消费者从队列中获取到消息后,需要手动确认消息已经被处理。

如果消费者没有确认消息,MQ 会将消息重新发送给其他消费者进行处理。

通过这种方式,可以保证消息不会被重复消费。

3.2 消息去重

在 MQ 中,还可以采用消息去重的方式来保证消息不被重复消费。

具体而言,可以在消费者端对消息进行去重,如果发现消息已经被处理过,就不再进行处理。

为了实现消息去重,可以使用一些技术,如布隆过滤器、数据库等。通过这种方式,可以保证消息不会被重复消费。

举个例子:

假设你有一个系统,需要将消息插入数据库中。如果同一条消息被重复消费了两次,就会在数据库中插入两条相同的数据,导致数据不准确。

但是,如果在消费消息时,先判断该消息是否已经被消费过,如果已经被消费过,就直接丢弃该消息,这样就可以保留一条正确的数据,从而保证数据的正确性。

4. 如何保证消息消费的幂等性?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

针对幂等性这个问题,我前面写过一篇文章《在工作中,接口幂等该如何做?》,可以回顾下。

我们也需要结合业务需求来思考解决方案。以下是几个可能的思路:

  • 如果需要将数据写入数据库,可以先根据主键查询一下,如果该数据已经存在,就不需要再插入一条新的数据了,而是直接进行更新操作。

  • 如果需要将数据写入 Redis 中,这个问题就比较简单了,因为 Redis 的 set 操作天然具有幂等性,即多次执行同样的 set 操作,只会产生一个相同的结果,不会产生重复数据。

  • 如果不是上述两个场景,可能需要进行一些较为复杂的处理。可以在生产者发送每条数据的时候,加上一个全局唯一的 ID,例如订单 ID 等。在消费者消费消息时,可以先根据这个 ID 到 Redis 中查询一下,该消息是否已经被消费过。如果该消息没有被消费过,就进行处理,并将这个 ID 写入 Redis 中,表示该消息已经被消费过了。如果该消息已经被消费过了,就不需要再进行处理了,保证不会重复处理相同的消息。

  • 另外一种解决方案是,基于数据库的唯一键来保证重复数据不会被插入多条。由于有唯一键的约束,重复数据插入时只会报错,而不会导致数据库中出现脏数据。这种方法需要在数据库中设置唯一键约束,从而保证数据的准确性。

总结

MQ 是一种常用的消息处理技术,可以帮助应用程序实现解耦、异步处理和流量控制等功能。

但是,MQ 也带来了一些问题,如消息重复消费和消息消费的幂等性问题。

为了解决这些问题,可以采用消费者手动确认、消息去重、消费者自身保证幂等性和 MQ 提供幂等性保证等方式。

通过这些方式,可以保证消息的可靠处理,提高系统的稳定性和可用性。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论