Sse推送实践

2023年 9月 14日 67.5k 0

Sse推送实践

场景

之前提到了,在多数据源环境下,我们需要对数据源的连接池做一层缓存,不能让每次连接都通过 TCP三次握手 -> 校验用户 -> 校验权限这几个步骤,很耗时,耗费性能。

为此我们需要将已经做过这几个步骤的连接进行一层缓存,感兴趣的同学可以看看这篇文章。juejin.cn/post/727086…

为了能在页面动态的看到,每个连接池的使用情况,使用频率,创建时间,我们有必要对这层缓存的变动做有效的订阅,同时发布给前端。

问题思考

如何当内存变动的时候监听到这个事件?其实很简单,在每次内存变动的时候,发布一个异步的事件,然后订阅这个事件即可。

如何实时地往前端做推送?

这里我做了一些调研:目前可行的 协议有两种:WebSocket和Sse.

Sse优点:

  • 基于HTTP,协议简单
  • 支持内置的重新连接,事件id
  • 企业防火墙数据包检查可以安全通过
  • WebSocket优点:

  • 实时通信。
  • 浏览器原生支持更好
  • 其实现在已经很明白了,使用Sse无疑是更好,更轻量的。

    代码实战

  • 写一套Sse长连接的缓存
  • /**
     * sse连接缓存,防止重复创建连接
     */
    @Component
    public class SseEmitterCache {
    ​
        private static Logger logger = LoggerFactory.getLogger(SseEmitterCache.class);
        private static SseEmitterCache sseEmitterCache;
        
        /**
         * clientId为key
         */
        protected ConcurrentHashMap connects;//连接
        
        private int maxSize = 200;
    ​
        //私有化构造器
        protected SseEmitterCache(){
        }
    ​
        //构造单例
        public static SseEmitterCache getCacheInstance(){
            if(null == sseEmitterCache){
                sseEmitterCache = new SseEmitterCache();
                sseEmitterCache.connects = new ConcurrentHashMap();
            }
            return sseEmitterCache;
        }
        
        
        public SseEmitter getCache(String key){
            //删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接
            if(connects.size() > maxSize){
                return null;
            }
            if(!connects.containsKey(key)){
                connects.put(key,new SseEmitter());
            }
            return connects.get(key);
        }
        
        
        public boolean hasCache(String key){
            return connects.containsKey(key);
        }
        
        
        public boolean remove(String key){
            connects.remove(key);
            return true;
        }
        
        public List getClientIds(){
            ConcurrentHashMap.KeySetView strings = connects.keySet();
            Iterator iterator = strings.stream().iterator();
            List clientIds = new ArrayList();
            while (iterator.hasNext()){
                clientIds.add(iterator.next());
            }
            return clientIds;
        }
    ​
        
    }
    ​
    
  • 对于你的连接池缓存,每次变动都发布事件
  • /**
         * 异步发布
         */
        private void sendCache(){
            MyThreadPool.executor().execute(()->applicationContext.publishEvent(new MysqlConPoolCacheChangeEvent(OperationType.CHANGE)));
        }
    
  • 监听,并发送给所有的SseEmitterCache中的客户。
  • /**
     * @author HT
     * 监听MysqlConPoolCacheChangeEvent事件
     */
    @Component
    public class MysqlConPoolCacheChangeEventListener {
        
        private static Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
        
        
        @Resource
        private DataSourceService dataSourceService;
        
        
        private SseEmitterCache emitterCache = SseEmitterCache.getCacheInstance();
        
        
        @EventListener
        public void onApplicationEvent(MysqlConPoolCacheChangeEvent event) {
            List clientIds = emitterCache.getClientIds();
            DatasourceCacheVo cacheVo = dataSourceService.showConnPool();
            //如果缓存发生了变动,就给所有的客户端都推送消息。
            clientIds.forEach(clientId ->{
                dataSourceService.sendMsg(clientId, JSONUtil.toJsonStr(cacheVo));
                logger.info("sendMsg:{}",JSONUtil.toJsonStr(cacheVo));
            });
        }
    }
    
  • Controller层
  •  @ApiOperation(value = "创建SSE连接")
        @GetMapping(path = "/createSse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public SseEmitter createSse(String id) {
            return dataSourceService.createSseEmitter(id);
        }
    
  • Service层
  •  @Override
        public SseEmitter createSseEmitter(String clientId) {
            if(StringUtils.isEmpty(clientId)){
                clientId = UUID.randomUUID().toString();
            }
            SseEmitter emitter = sseEmitterCache.getCache(clientId);
            logger.info("获取到SSE,id:{}",clientId);
            String finalClientId = clientId;
            sendMsg(clientId,JSONUtil.toJsonStr(this.showConnPool()));
            emitter.onCompletion(()->{
                logger.info("SSE关闭,id:{}",finalClientId);
                sseEmitterCache.remove(finalClientId);
            });
            emitter.onTimeout(()->{
                logger.info("SSE关闭,id:{}",finalClientId);
                sseEmitterCache.remove(finalClientId);
            });
            emitter.onError((e)->{
                logger.info("SSE异常,id:{}",finalClientId);
                sseEmitterCache.remove(finalClientId);
            });
            return emitter;
        }
    

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论