中秋创意投稿利用消息队列实现秒杀月饼的业务

2023年 9月 14日 92.4k 0

业务场景

中秋节即将来临,公司准备了5000份中秋节月饼大礼包,经理说,先到先得,去晚了就没了。根据这个业务场景,我联想到了秒杀系统,是否可以用消息中间件实现。

消息中间件 rocketmq

RocketMQ 是阿里巴巴开发的一个分布式消息中间件,与其他消息中间件基本类似。总的来说就是,降低数据库压力,将短时间内的高请求量转换为长时间的低请求量,实现削峰。

先介绍一下其中的基本组件:

ConnectionFactory(连接管理器):应用程序与RabbitMQ之间建立连接的管理器

Channel(信道):消息推送使用的通道

Exchange(交换器):用于接受、分配消息

Queue(队列):用于存储生产者的消息

RoutingKey(路由键):生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

BindKey(绑定键):用于把交换器的消息绑定到队列上

生产者producer

在消息发送时,需要创建生产者producer,由producer发送消息给消费者。
代码如下:

@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.accessKeyId}")
    private String accessKey;
    @Value("${rocketmq.accessKeySecret}")
    private String secretKey;
    @Value("${rocketmq.namesrvaddr}")
    private String namesrvAddr;

    @Bean("producer")
    public Producer producer() throws Exception {
        Properties properties = new Properties();
        // 鉴权用 AccessKey
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        // 鉴权用 SecretKey
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        // 设置 TCP 接入域名
        properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        return producer;
    }
}

accessKey,accessKeySecret在开通服务后可以看到

消费者 Consumer

由于可以有多个消费者,所以创建一个消费者池

@Bean("consumerPool")
public ConsumerPool consumerPool() {
    ConsumerPool consumerPool = new ConsumerPool(accessKey, secretKey, namesrvAddr);
    return consumerPool;
}

在消费者消费消息时,根据消息的topic和tag来消费消息,这样就需要生产者在发送消息是指定一个topic和tag。

消息 Message

生产者发送的消息需要创建一个指定的topic和tag,方便消费者消费。
创建message时,把需要的信息放入message,本示例中将用户id传入消息中,

JSONObject zongziMessage = new JSONObject();
zongziMessage.put("userId", userId);
Message rocketMessage = new Message(simpleTopic, zongziTag, MessageUtils.messageByteArray(zongziMessage);
SendResult send = producer.send(rocketMessage);

这样,消息就被生产者发送出去了,订阅了该topic的消费者就能消费到这条消息。

消费

消费端消费时,需要从消费者池中拿出一个消费者进行实例化。

/**
 * 接收zongziMessage服务传递过来的数据 消费者
 * @throws IOException
 */
@PostConstruct
public void qiangZongziNotice() {
    Consumer consumer = this.consumerPool.getConsumer(gidOfficeResultGroup);
    consumer.subscribe(simpleTopic,zongziTag, (message, context) -> {
            JSONObject jsonObject = MessageUtils.messageMap(message);
            /**
             * 取到消息中的userId
             */
            String userId = jsonObject.getString("userId");
            
            try {
                int stockNum = getStockNum(userId);
                if(stockNum > 0){
                    //剩余库存不为0,库存减一,并将用户中奖信息存入数据库
                    decrByStock(userId);
                    saveUserZongzi(userId);
                }
                return Action.CommitMessage;
            } catch (Exception e) {
                // 发生异常时重新消费
                return Action.ReconsumeLater;
            }
            
            return Action.CommitMessage;
        });
        consumer.start();

}
/** 
    * 调用数据库service给数据库对应商品库存减一 ,并返回
    */ 
public void decrByStock(String userId) { 
    log.info("库存消息队列收到的userId是:{}", userId); 
    
    stockService.decrByStock(userId); 
}
/** 
    * 剩余库存
    */ 
public int getStockNum(String userId) { 
    log.info("库存消息队列收到的userId是:{}", userId); 
 
    return stockService.getStockNum(userId); 
}

这里只是简单介绍了流程中重要的方法,简单的实现中秋节抢月饼的秒杀活动。

总结

rocketmq封装的很精致,实现抢月饼等等秒杀活动很容易上手。但其中还有很多细节需要深究,在以后的学习中,会继续深究代码底层逻辑,精确研究消息中间件的发送方式。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论