想象一下这样一个场景:多名旅行者同时尝试预订热门目的地的最后一个可用房间。如果没有适当的并发控制机制,这种情况很快就会变成竞争状态,导致房间超额预订和客户沮丧。
我们将深入研究用于应对这些挑战的两种关键策略的复杂性:乐观锁定和消息队列。
想象一下您正在使用一个在线酒店预订平台,类似于 Booking.com 或 Expedia 等知名平台。以下是同步和异步流程如何发挥作用:
同步流程:
预订房间(同步):
- 您访问酒店预订网站并选择您的目的地、入住和退房日期以及其他偏好。
- 您点击“立即预订”按钮即可预订房间。
- 该网站使用基于 HTTP 的同步协议(如 REST 或 SOAP)将您的请求发送到酒店的预订系统。
- 酒店的系统会立即同步处理您的请求。它检查房间可用性,为您预订房间,并生成预订号码。
- 预订号码将发送回您的浏览器,并在几秒钟内显示在网站上。
- 您可以立即获得预订号码,然后可以放心地继续您的旅行计划。
创建房间实体
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class Room {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String roomType;
private boolean isAvailable;
// getters and setters
}
创建房间存储库
import org.springframework.data.jpa.repository.JpaRepository;
public interface RoomRepository extends JpaRepository {
Room findByRoomType(String roomType);
}
创建客房预订请求 DTO
import java.time.LocalDate;
public class RoomBookingRequest {
private String roomType;
private LocalDate checkInDate;
private LocalDate checkOutDate;
// getters and setters
}
创建客房预订响应 DTO
public class RoomBookingResponse {
private String reservationNumber;
// getters and setters
}
创建客房服务
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class RoomService {
@Autowired
private RoomRepository roomRepository;
public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) {
String roomType = bookingRequest.getRoomType();
LocalDate checkInDate = bookingRequest.getCheckInDate();
LocalDate checkOutDate = bookingRequest.getCheckOutDate();
Room room = roomRepository.findByRoomType(roomType);
if (room != null && room.isAvailable()) {
// Add validation to check availability based on check-in and check-out dates here.
// For simplicity, we'll assume the room is available.
room.setAvailable(false);
roomRepository.save(room);
// Generate a reservation number (you can implement your logic here).
String reservationNumber = generateReservationNumber();
return new RoomBookingResponse(reservationNumber);
} else {
throw new RoomNotAvailableException();
}
}
private String generateReservationNumber() {
// Generate a unique reservation number (you can implement your logic here).
return UUID.randomUUID().toString();
}
}
创建房间控制器
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/rooms")
public class RoomController {
@Autowired
private RoomService roomService;
// Book a room
@PostMapping("/book")
public RoomBookingResponse bookRoom(@RequestBody RoomBookingRequest bookingRequest) {
return roomService.bookRoom(bookingRequest);
}
}
定义自定义异常
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(HttpStatus.BAD_REQUEST)
public class RoomNotAvailableException extends RuntimeException {
public RoomNotAvailableException() {
super("The requested room is not available.");
}
}
测试API
您可以使用 Postman 或 cURL 等工具来测试您的 API。要预订房间,请http://localhost:8080/api/rooms/book使用包含房间类型、入住日期和退房日期的 JSON 正文发出 POST 请求:
{
"roomType" : "Standard" ,
"checkInDate" : "2023-10-01" ,
"checkOutDate" : "2023-10-05"
}
如果房间可用,API 将返回带有预订编号的 JSON 响应。您可以根据您的课堂需求自定义预订逻辑和预订号码生成RoomService。
异步流程
当多个用户同时调用Booking API时
当多个并发呼叫在系统中搜索同一房间时,可能存在潜在的缺点和挑战:
竞争条件:当多个请求尝试同时预订同一房间时,可能会出现竞争条件。如果处理不当,这可能会导致超额预订,即系统允许的预订数量超过了可用房间的数量。
如何解决并发问题?
乐观锁定是一种数据库级技术,可防止多个用户同时尝试更新同一资源时发生数据冲突。
另一方面,消息队列是异步通信工具,可确保请求的有序、可靠处理,使其成为分布式系统中处理并发请求的理想选择。
方法一:实现消息队列响应并发请求
消息队列确保请求按照接收顺序进行处理,从而防止竞争条件和超量预订。
- 多个客户端向端点发出 POST 请求/api/rooms/book以同时预订酒店房间。
- 处理RoomController传入的预订请求。
- 该roomService.bookRoom方法接收预订请求。
- 它使用该方法将预订请求发送到名为“room-booking”的 RabbitMQ 消息队列rabbitTemplate.convertAndSend。
- 它向客户端返回初步响应,其中包含一条消息,表明预订请求已发送,客户端应等待确认。
- 预订请求被放入“房间预订”队列中。消息队列系统(在本例中为 RabbitMQ)确保每个预订请求都按照收到的顺序进行处理,以防止竞争情况。
- 监听RoomBookingMessageConsumer“房间预订”队列。
- processBookingRequest当预订请求出队时,将调用消费者的方法。在该方法中,您通常会实现以下逻辑:
- 根据请求的房型、入住日期和退房日期检查客房供应情况。
- 如果房间可用,则生成预订号码。
- 更新数据库中的房间可用性,将其标记为不可用,以防止重复预订。
- 通过RabbitMQ向客户端发送包含预约号的响应消息
8. 在 中RoomBookingMessageConsumer,处理预订请求并生成预订号码后,您可以使用传统的 HTTP 客户端(例如RestTemplate、HttpClient)将确认响应直接发送到客户端的回调 URL 端点(该端点在请求中发送)。
执行:
创建客房预订请求和响应 DTO
import java.time.LocalDate;
public class RoomBookingRequest {
private String roomType;
private LocalDate checkInDate;
private LocalDate checkOutDate;
private String clientCallbackUrl; // Added to specify the client's callback URL
// getters and setters
}
public class RoomBookingResponse {
private String reservationNumber;
// getters and setters
}
修改控制器
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/rooms")
public class RoomController {
@Autowired
private RoomService roomService;
@PostMapping("/book")
public RoomBookingResponse bookRoom(@RequestBody RoomBookingRequest bookingRequest) {
return roomService.bookRoom(bookingRequest);
}
}
创建客房预订服务(生产者)
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
@Service
public class RoomService {
@Autowired
private RoomRepository roomRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private RestTemplate restTemplate = new RestTemplate();
public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) {
String roomType = bookingRequest.getRoomType();
// Send the booking request to the message queue
rabbitTemplate.convertAndSend("room-booking-exchange", "room-booking", bookingRequest);
return new RoomBookingResponse("Booking request sent. Please wait for confirmation.");
}
// This method sends the response to the client's callback URL
public void sendResponseToClient(RoomBookingResponse response, String clientCallbackUrl) {
ResponseEntity result = restTemplate.postForEntity(clientCallbackUrl, response, Void.class);
if (result.getStatusCode().is2xxSuccessful()) {
// Handle a successful response sent to the client
} else {
// Handle the case when the response to the client failed
}
}
}
创建消息消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RoomBookingMessageConsumer {
@Autowired
private RoomService roomService;
@RabbitListener(queues = "room-booking-queue")
public void processBookingRequest(RoomBookingRequest bookingRequest) {
// Process the booking request
RoomBookingResponse response = processBookingLogic(bookingRequest);
// Send the confirmation response to the client's callback URL
roomService.sendResponseToClient(response, bookingRequest.getClientCallbackUrl());
}
private RoomBookingResponse processBookingLogic(RoomBookingRequest bookingRequest) {
// Implement your booking logic here, e.g., checking room availability and generating a reservation number
// Update room availability in the database
// Send a response message to confirm the booking or indicate unavailability
// For simplicity, we'll assume the room is available and generate a reservation number.
String reservationNumber = generateReservationNumber();
return new RoomBookingResponse(reservationNumber);
}
private String generateReservationNumber() {
// Generate a unique reservation number (you can implement your logic here).
return "RES-" + System.currentTimeMillis();
}
}
方法二:实现乐观锁来处理并发请求
您可以修改代码以使用同步方法和 JPA 乐观锁定。
步骤1:修改Room实体:@Version向实体添加一个字段Room以启用乐观锁定:
import javax.persistence.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Entity
public class Room {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String roomType;
private boolean isAvailable;
@Version
private Long version;
// getters and setters
}
步骤2:修改客房服务对每个房间使用ReentrantLock来同步访问房间预订操作
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service
public class RoomService {
@Autowired
private RoomRepository roomRepository;
private final ConcurrentHashMap roomLocks = new ConcurrentHashMap();
public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) {
String roomType = bookingRequest.getRoomType();
LocalDate checkInDate = bookingRequest.getCheckInDate();
LocalDate checkOutDate = bookingRequest.getCheckOutDate();
Room room = roomRepository.findByRoomType(roomType);
if (room != null) {
Lock roomLock = roomLocks.computeIfAbsent(room.getId(), id -> new ReentrantLock());
roomLock.lock();
try {
if (room.isAvailable()) {
// Add validation to check availability based on check-in and check-out dates here.
// For simplicity, we'll assume the room is available.
room.setAvailable(false);
roomRepository.save(room);
// Generate a reservation number (you can implement your logic here).
String reservationNumber = generateReservationNumber();
return new RoomBookingResponse(reservationNumber);
}
} finally {
roomLock.unlock();
}
}
throw new RoomNotAvailableException();
}
private String generateReservationNumber() {
// Generate a unique reservation number (you can implement your logic here).
return UUID.randomUUID().toString();
}
}
详细工作原理:
并发请求&ConcurrentHashMap:当同一房间收到多个并发预订请求时,它们可能同时到达并可能导致竞争条件。的引入ConcurrentHashMap确保每个房间都有自己的锁。这ConcurrentHashMap是一个线程安全的映射,可以由多个线程同时安全地访问。
通过锁定并发更新房间可用性:如果两个线程同时尝试预订同一个房间,则只有其中一个线程会使用 成功获取锁roomLock.lock(),而另一个线程将暂时阻塞,直到第一个线程释放锁。
释放锁以供其他线程更新:一旦线程获取了锁并成功修改了房间的可用性,它就会使用 释放锁roomLock.unlock(),从而允许其他线程继续预订其他房间。
乐观锁防止数据库级别的竞争条件:在代码中,实体中的字段启用数据库级别的乐观锁。更新房间时,JPA 在允许更新之前会根据实体中的版本字段检查数据库中的版本字段。@VersionRoom
- 如果两个事务同时尝试更新同一个房间,根据版本号的比较,只有其中一个会成功,从而防止数据库级别的数据冲突。
- 因此 2 个不同的事务无法同时更新数据库中的一个房间