SpringBoot使用WebSocket实现即时消息

2023年 8月 14日 53.2k 0

环境:SpringBoot2.4.12.

依赖


  org.springframework.boot
  spring-boot-starter-web


  org.springframework.boot
  spring-boot-starter-websocket

定义消息类型

  • 抽象消息对象
public class AbstractMessage {
  /**
   *   消息类型
   */
  protected String type ;
  
  /**
   *   消息内容
   */
  protected String content ;
  /**
   *   消息日期
   */
  protected String date ;
}

消息对象子类

1、Ping检查消息

public class PingMessage extends AbstractMessage {
  public PingMessage() {}
  public PingMessage(String type) {
    this.type = type ;
  }
}

2、系统消息

public class SystemMessage extends AbstractMessage {
  public SystemMessage() {}
  public SystemMessage(String type, String content) {
    this.type = type ;
    this.content = content ;
  }
}

3、点对点消息

public class PersonMessage extends AbstractMessage {
  private String fromName ;
  private String toName ;
}

消息类型定义

public enum MessageType {
  
  /**
   *   系统消息 0000;心跳检查消息 0001;点对点消息2001
   */
  SYSTEM("0000"), PING("0001"), PERSON("2001") ;
  
  private String type ;
  
  private MessageType(String type) {
    this.type = type ;
  }


  public String getType() {
    return type;
  }


  public void setType(String type) {
    this.type = type;
  }
  
}

WebSocket服务端点

该类作用就是定义客户端连接的地址

@ServerEndpoint(value = "/message/{username}", 
  encoders = {WsMessageEncoder.class},
  decoders = {WsMessageDecoder.class},
  subprotocols = {"gmsg"},
  configurator = MessageConfigurator.class)  
@Component  
public class GMessageListener {  
  
    public static ConcurrentMap sessions = new ConcurrentHashMap();
    private static Logger logger = LoggerFactory.getLogger(GMessageListener.class) ;
  
    private String username ;
    
    @OnOpen  
    public void onOpen(Session session, EndpointConfig config, @PathParam("username") String username){
      UserSession userSession = new UserSession(session.getId(), username, session) ;
      this.username = username ;
      sessions.put(username, userSession) ;
      logger.info("【{}】用户进入, 当前连接数:{}", username, sessions.size()) ; 
    }  
  
    @OnClose  
    public void onClose(Session session, CloseReason reason){  
      UserSession userSession = sessions.remove(this.username) ;
      if (userSession != null) {
        logger.info("用户【{}】, 断开连接, 当前连接数:{}", username, sessions.size()) ;
      }
    }
    
    @OnMessage
    public void pongMessage(Session session, PongMessage message) {
      ByteBuffer buffer = message.getApplicationData() ;
      logger.debug("接受到Pong帧【这是由浏览器发送】:" + buffer.toString());
    }
    
    @OnMessage
    public void onMessage(Session session, AbstractMessage message) {
      if (message instanceof PingMessage) {
        logger.debug("这里是ping消息");
        return ;
      }
      if (message instanceof PersonMessage) {
        PersonMessage personMessage = (PersonMessage) message ;
        if (this.username.equals(personMessage.getToName())) {
          logger.info("【{}】收到消息:{}", this.username, personMessage.getContent());
        } else {
          UserSession userSession = sessions.get(personMessage.getToName()) ;
          if (userSession != null) {
            try {
            userSession.getSession().getAsyncRemote().sendText(new ObjectMapper().writeValueAsString(message)) ;
          } catch (JsonProcessingException e) {
            e.printStackTrace();
          }
          }
        }
        return ;
      }
      if (message instanceof SystemMessage) {
        logger.info("接受到消息类型为【系统消息】") ; 
        return ;
      }
    }
    
    @OnError
    public void onError(Session session, Throwable error) {
      logger.error(error.getMessage()) ;
    }
}

WsMessageEncoder.java类该类的主要作用是,当发送的消息是对象时,该如何转换

public class WsMessageEncoder implements Encoder.Text {
  private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
  @Override
  public void init(EndpointConfig endpointConfig) {
  }
  @Override
  public void destroy() {
  }
  @Override
  public String encode(AbstractMessage tm) throws EncodeException {
    String message = null ;
    try {
      message = new ObjectMapper().writeValueAsString(tm);
    } catch (JsonProcessingException e) {
      logger.error("JSON处理错误:{}", e) ;
    }
    return message;
  }
}

WsMessageDecoder.java类该类的作用是,当接收到消息时如何转换成对象。

public class WsMessageDecoder implements  Decoder.Text {


  private static Logger logger = LoggerFactory.getLogger(WsMessageDecoder.class) ;
  private static Set msgTypes = new HashSet() ;
  
  static {
    msgTypes.add(MessageType.PING.getType()) ;
    msgTypes.add(MessageType.SYSTEM.getType()) ;
    msgTypes.add(MessageType.PERSON.getType()) ;
  }
  @Override
  @SuppressWarnings("unchecked")
  public AbstractMessage decode(String s) throws DecodeException {
    AbstractMessage message = null ;
    try {
      ObjectMapper mapper = new ObjectMapper() ;
      Map map = mapper.readValue(s, Map.class) ;
      String type = map.get("type") ;
      switch(type) {
        case "0000":
          message = mapper.readValue(s, SystemMessage.class) ;
          break;
        case "0001":
          message = mapper.readValue(s, PingMessage.class) ;
          break;
        case "2001":
          message = mapper.readValue(s, PersonMessage.class) ;
          break;
      }
    } catch (JsonProcessingException e) {
      logger.error("JSON处理错误:{}", e) ;
    }
    return message ;
  }


  // 该方法判断消息是否可以被解码(转换)
  @Override
  @SuppressWarnings("unchecked")
  public boolean willDecode(String s) {
    Map map = new HashMap() ;
    try {
      map = new ObjectMapper().readValue(s, Map.class);
    } catch (JsonProcessingException e) {
      e.printStackTrace();
    }
    logger.debug("检查消息:【" + s + "】是否可以解码") ;
    String type = map.get("type") ;
    if (StringUtils.isEmpty(type) || !msgTypes.contains(type)) {
      return false ;
    }
    return true ;
  }
  @Override
  public void init(EndpointConfig endpointConfig) {
  }
  @Override
  public void destroy() {
  }
}

MessageConfigurator.java类该类的作用是配置服务端点,比如配置握手信息

public class MessageConfigurator extends ServerEndpointConfig.Configurator {
  private static Logger logger = LoggerFactory.getLogger(MessageConfigurator.class) ;
  @Override
  public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
    logger.debug("握手请求头信息:" + request.getHeaders());
    logger.debug("握手响应头信息:" + response.getHeaders());
    super.modifyHandshake(sec, request, response);
  }  
}

WebSocke配置类

@Configuration
public class WebSocketConfig {
  
  @Bean
    public ServerEndpointExporter serverEndpointExporter (){  
        return new ServerEndpointExporter();  
    }  
  
}

当以jar包形式运行时需要配置该bean,暴露我们配置的@ServerEndpoint;当我们以war独立tomcat运行时不能配置该bean。

前端页面



 
  
  
  
  
  
  WebSocket
  

  
  let gm = null ;
  let username = null ;
  function ListenerMsg({url, protocols = ['gmsg'], options = {}}) {
    if (!url){ 
      throw new Error("未知服务地址") ;
    }
    gm = new window.__GM({
      url: url,
      protocols: protocols
    }) ;
    gm.open(options) ;
  }
  ListenerMsg.init = (user) => {
    if (!user) {
      alert("未知的当前登录人") ;
      return ;
    }
    let url = `ws://localhost:8080/message/${user}` ;
    let msg = document.querySelector("#msg")
    ListenerMsg({url, options: {
      onmessage (e) {
        let data = JSON.parse(e.data) ;
        let li = document.createElement("li") ;
        li.innerHTML = "【" + data.fromName + "】对你说:" + data.content ;
        msg.appendChild(li) ;
      }
    }}) ;
  }
  function enter() {
    username = document.querySelector("#nick").value ;
    ListenerMsg.init(username) ;
    document.querySelector("#chat").style.display = "block" ;
    document.querySelector("#enter").style.display = "none" ;
    document.querySelector("#cu").innerText = username ;
  }
  function send() {
    let a = document.querySelector("#toname") ;
    let b = document.querySelector("#content") ;
    let toName = a.value ;
    let content = b.value ;
    gm.sendMessage({type: "2001", content, fromName: username, toName}) ;
    a.value = '' ;
    b.value = '' ;
  }

 
 
  
    进入
  
  
  
    当前用户:
    用户:
    内容:
    发送
  
  
    

这里有个g-messages.js文件是我写的一个工具类,用来做连接及心跳检查用的。

到此所有的代码完毕,接下来测试

测试

打开两个标签页,以不同的用户进入。

图片

图片

输入对方用户名发送消息

图片

图片

成功了,简单的websocket。我们生产环境还就这么完的,8g内存跑了6w的用户。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论