Sse推送实践
场景
之前提到了,在多数据源环境下,我们需要对数据源的连接池做一层缓存,不能让每次连接都通过 TCP三次握手 -> 校验用户 -> 校验权限这几个步骤,很耗时,耗费性能。
为此我们需要将已经做过这几个步骤的连接进行一层缓存,感兴趣的同学可以看看这篇文章。juejin.cn/post/727086…
为了能在页面动态的看到,每个连接池的使用情况,使用频率,创建时间,我们有必要对这层缓存的变动做有效的订阅,同时发布给前端。
问题思考
如何当内存变动的时候监听到这个事件?其实很简单,在每次内存变动的时候,发布一个异步的事件,然后订阅这个事件即可。
如何实时地往前端做推送?
这里我做了一些调研:目前可行的 协议有两种:WebSocket和Sse.
Sse优点:
WebSocket优点:
其实现在已经很明白了,使用Sse无疑是更好,更轻量的。
代码实战
/**
* sse连接缓存,防止重复创建连接
*/
@Component
public class SseEmitterCache {
private static Logger logger = LoggerFactory.getLogger(SseEmitterCache.class);
private static SseEmitterCache sseEmitterCache;
/**
* clientId为key
*/
protected ConcurrentHashMap connects;//连接
private int maxSize = 200;
//私有化构造器
protected SseEmitterCache(){
}
//构造单例
public static SseEmitterCache getCacheInstance(){
if(null == sseEmitterCache){
sseEmitterCache = new SseEmitterCache();
sseEmitterCache.connects = new ConcurrentHashMap();
}
return sseEmitterCache;
}
public SseEmitter getCache(String key){
//删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
if(connects.size() > maxSize){
return null;
}
if(!connects.containsKey(key)){
connects.put(key,new SseEmitter());
}
return connects.get(key);
}
public boolean hasCache(String key){
return connects.containsKey(key);
}
public boolean remove(String key){
connects.remove(key);
return true;
}
public List getClientIds(){
ConcurrentHashMap.KeySetView strings = connects.keySet();
Iterator iterator = strings.stream().iterator();
List clientIds = new ArrayList();
while (iterator.hasNext()){
clientIds.add(iterator.next());
}
return clientIds;
}
}
/**
* 异步发布
*/
private void sendCache(){
MyThreadPool.executor().execute(()->applicationContext.publishEvent(new MysqlConPoolCacheChangeEvent(OperationType.CHANGE)));
}
/**
* @author HT
* 监听MysqlConPoolCacheChangeEvent事件
*/
@Component
public class MysqlConPoolCacheChangeEventListener {
private static Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
@Resource
private DataSourceService dataSourceService;
private SseEmitterCache emitterCache = SseEmitterCache.getCacheInstance();
@EventListener
public void onApplicationEvent(MysqlConPoolCacheChangeEvent event) {
List clientIds = emitterCache.getClientIds();
DatasourceCacheVo cacheVo = dataSourceService.showConnPool();
//如果缓存发生了变动,就给所有的客户端都推送消息。
clientIds.forEach(clientId ->{
dataSourceService.sendMsg(clientId, JSONUtil.toJsonStr(cacheVo));
logger.info("sendMsg:{}",JSONUtil.toJsonStr(cacheVo));
});
}
}
@ApiOperation(value = "创建SSE连接")
@GetMapping(path = "/createSse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter createSse(String id) {
return dataSourceService.createSseEmitter(id);
}
@Override
public SseEmitter createSseEmitter(String clientId) {
if(StringUtils.isEmpty(clientId)){
clientId = UUID.randomUUID().toString();
}
SseEmitter emitter = sseEmitterCache.getCache(clientId);
logger.info("获取到SSE,id:{}",clientId);
String finalClientId = clientId;
sendMsg(clientId,JSONUtil.toJsonStr(this.showConnPool()));
emitter.onCompletion(()->{
logger.info("SSE关闭,id:{}",finalClientId);
sseEmitterCache.remove(finalClientId);
});
emitter.onTimeout(()->{
logger.info("SSE关闭,id:{}",finalClientId);
sseEmitterCache.remove(finalClientId);
});
emitter.onError((e)->{
logger.info("SSE异常,id:{}",finalClientId);
sseEmitterCache.remove(finalClientId);
});
return emitter;
}