业务场景
中秋节即将来临,公司准备了5000份中秋节月饼大礼包,经理说,先到先得,去晚了就没了。根据这个业务场景,我联想到了秒杀系统,是否可以用消息中间件实现。
消息中间件 rocketmq
RocketMQ 是阿里巴巴开发的一个分布式消息中间件,与其他消息中间件基本类似。总的来说就是,降低数据库压力,将短时间内的高请求量转换为长时间的低请求量,实现削峰。
先介绍一下其中的基本组件:
ConnectionFactory(连接管理器):应用程序与RabbitMQ之间建立连接的管理器
Channel(信道):消息推送使用的通道
Exchange(交换器):用于接受、分配消息
Queue(队列):用于存储生产者的消息
RoutingKey(路由键):生产者将消息发送给交换器的时候,会指定一个RoutingKey,用来指定这个消息的路由规则,这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
BindKey(绑定键):用于把交换器的消息绑定到队列上
生产者producer
在消息发送时,需要创建生产者producer,由producer发送消息给消费者。
代码如下:
@Configuration
public class RocketMqConfig {
@Value("${rocketmq.accessKeyId}")
private String accessKey;
@Value("${rocketmq.accessKeySecret}")
private String secretKey;
@Value("${rocketmq.namesrvaddr}")
private String namesrvAddr;
@Bean("producer")
public Producer producer() throws Exception {
Properties properties = new Properties();
// 鉴权用 AccessKey
properties.put(PropertyKeyConst.AccessKey, accessKey);
// 鉴权用 SecretKey
properties.put(PropertyKeyConst.SecretKey, secretKey);
// 设置 TCP 接入域名
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
Producer producer = ONSFactory.createProducer(properties);
producer.start();
return producer;
}
}
accessKey,accessKeySecret在开通服务后可以看到
消费者 Consumer
由于可以有多个消费者,所以创建一个消费者池
@Bean("consumerPool")
public ConsumerPool consumerPool() {
ConsumerPool consumerPool = new ConsumerPool(accessKey, secretKey, namesrvAddr);
return consumerPool;
}
在消费者消费消息时,根据消息的topic和tag来消费消息,这样就需要生产者在发送消息是指定一个topic和tag。
消息 Message
生产者发送的消息需要创建一个指定的topic和tag,方便消费者消费。
创建message时,把需要的信息放入message,本示例中将用户id传入消息中,
JSONObject zongziMessage = new JSONObject();
zongziMessage.put("userId", userId);
Message rocketMessage = new Message(simpleTopic, zongziTag, MessageUtils.messageByteArray(zongziMessage);
SendResult send = producer.send(rocketMessage);
这样,消息就被生产者发送出去了,订阅了该topic的消费者就能消费到这条消息。
消费
消费端消费时,需要从消费者池中拿出一个消费者进行实例化。
/**
* 接收zongziMessage服务传递过来的数据 消费者
* @throws IOException
*/
@PostConstruct
public void qiangZongziNotice() {
Consumer consumer = this.consumerPool.getConsumer(gidOfficeResultGroup);
consumer.subscribe(simpleTopic,zongziTag, (message, context) -> {
JSONObject jsonObject = MessageUtils.messageMap(message);
/**
* 取到消息中的userId
*/
String userId = jsonObject.getString("userId");
try {
int stockNum = getStockNum(userId);
if(stockNum > 0){
//剩余库存不为0,库存减一,并将用户中奖信息存入数据库
decrByStock(userId);
saveUserZongzi(userId);
}
return Action.CommitMessage;
} catch (Exception e) {
// 发生异常时重新消费
return Action.ReconsumeLater;
}
return Action.CommitMessage;
});
consumer.start();
}
/**
* 调用数据库service给数据库对应商品库存减一 ,并返回
*/
public void decrByStock(String userId) {
log.info("库存消息队列收到的userId是:{}", userId);
stockService.decrByStock(userId);
}
/**
* 剩余库存
*/
public int getStockNum(String userId) {
log.info("库存消息队列收到的userId是:{}", userId);
return stockService.getStockNum(userId);
}
这里只是简单介绍了流程中重要的方法,简单的实现中秋节抢月饼的秒杀活动。
总结
rocketmq封装的很精致,实现抢月饼等等秒杀活动很容易上手。但其中还有很多细节需要深究,在以后的学习中,会继续深究代码底层逻辑,精确研究消息中间件的发送方式。