参考

SpringBoot 集成WebSocket详解_springboot websocket-CSDN博客

为什么需要WebSocket

HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接。

  • 无状态:每次连接只处理一个请求,请求结束后断开连接。
  • 无连接:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。

通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。

WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。


WebSocket特点

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。


搭建websocket服务端

创建SpringBoot项目,引入 WebSocket依赖

         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <!-- SpringBoot Boot Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

application.yml

server:
  port: ${SERVER_PORT:9206}

# Spring
spring:
  application:
    # 应用名称
    name: ruoyi-websocket
  redis:
    host: localhost
    port: 6379
    password: 123456

配置类WebSocketConfig

启用 WebSocket的支持也是很简单

/**
 * WebSocket配置类。开启WebSocket的支持
 */
@Configuration
public class WebSocketConfig {

    /**
     * bean注册:会自动扫描带有@ServerEndpoint注解声明的Websocket Endpoint(端点),注册成为Websocket bean。
     * 要注意,如果项目使用外置的servlet容器,而不是直接使用springboot内置容器的话,就不要注入ServerEndpointExporter,
     * 因为它将由容器自己提供和管理。
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

新增WebSocketServer

这里就是重点了,核心都在这里

  • 因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
  • 直接@ServerEndpoint("/websocket/{sid}") 、@Component启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
  • 新建一个ConcurrentHashMap用于接收当前userId的WebSocket或者Session信息,方便IM之间对userId进行推送消息。
  • 集群版(多个ws节点)还需要借助 MySQL或者 Redis等进行订阅广播方式处理,改造对应的 sendMessage方法即可。
  • 前端html页面与之关联的接口
  1. var reqUrl = "http://ip:port/websocket/" + cid;
  2. socket = new WebSocket(reqUrl.replace("http", "ws"));
/**
 * WebSocket的操作类
 * html页面与之关联的接口
 * var reqUrl = "http://ip:port/websocket/" + sid;
 * socket = new WebSocket(reqUrl.replace("http", "ws"));
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{sid}")
public class WebSocketServer {

    //存放所有在线的客户端
    private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

    //连接sid和连接会话
    private String sid;
    private Session session;

    /**
     * 连接建立成功调用的方法。
     * 由前端<code>new WebSocket</code>触发
     *
     * @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息
     * @param sid     每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) {
        webSocketMap.put(sid, this);
        this.sid = sid;
        this.session = session;
        sendSingleMessage(sid, "连接成功,sid="+sid);
        log.info("连接建立成功,当前在线数为:{} ==> 开始监听新连接:sessionId = {}, sid = {}。", webSocketMap.size(), session.getId(), sid);
    }

    /**
     * 连接关闭调用的方法。
     * 由前端<code>socket.close()</code>触发
     *
     * @param session
     * @param sid
     */
    @OnClose
    public void onClose(Session session,@PathParam("sid") String sid) {
        // 从 Map中移除
        webSocketMap.remove(sid);
        log.info("连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:sessionId = {}, sid = {}。", webSocketMap.size(), session.getId(), sid);
    }

    /**
     * 发生错误调用的方法
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket发生错误,错误信息为:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * 收到客户端消息后调用的方法。
     * 由前端<code>socket.send</code>触发
     * html界面传递来得数据格式,可以自定义.{"sid":"user-1","message":"hello websocket"}
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        JSONObject jsonObject = JSON.parseObject(message);
        String toSid = jsonObject.getString("sid");
        String msg = jsonObject.getString("message");
        log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);

        //模拟约定:如果未指定sid信息,则群发,否则就单独发送
        if (toSid == null || toSid.equals("") || "".equalsIgnoreCase(toSid)) {
            sendToAll(msg);
        } else {
            sendToOne(toSid, msg);
        }
    }

    //群发消息
    private void sendToAll(String message) {
        // 遍历在线map集合
        webSocketMap.forEach((toSid, webSocketServer) -> {
            webSocketServer.sendToOne(toSid,message);
        });
    }

    //指定发送消息
    private void sendToOne(String toSid, String message){
        // 排除掉自己
        if (sid.equalsIgnoreCase(toSid)) {
            log.info("不能给自己发消息 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
            return;
        }
        sendMessage(toSid,message);         //分布式
//        sendSingleMessage(toSid,message); //单机
    }

    //分布式使用  服务端给redis发送消息,需要指定channel
    public void sendMessage(@NotNull String toSid,String message){
        try {
            String newMessage = new String(message.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8);
            Map<String,String> map = new HashMap<>();
            map.put(Constants.REDIS_MESSAGE_KEY, toSid);
            map.put(Constants.REDIS_MESSAGE_VALUE, newMessage);

            log.info("服务端给redis队列发消息 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
            StringRedisTemplate template = SpringUtils.getBean(StringRedisTemplate.class);
            template.convertAndSend(Constants.REDIS_CHANNEL, JSON.toJSONString(map));
        }catch (Exception e){
            e.printStackTrace();
            log.info("服务端给redis队列发消息失败 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
        }

    }


    //单机使用  通过指定的客户id向该客户推送消息
    //当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听
    public void sendSingleMessage(@NotNull String toSid, String message) {
        WebSocketServer webSocketServer = webSocketMap.get(toSid);
        if (!StringUtils.isNull(webSocketServer)) {
            try {
                webSocketServer.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

配置redis进行监听

配置监听器

/**
 * 消息监听器,接收订阅消息
 */
@Component
public class RedisReceiver implements MessageListener {
    Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private WebSocketServer webSocketServer;

    /**
     * 监听redis的channel信息
     * 这里只对channel=REDIS_CHANNEL的信息进行处理
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());// 订阅的频道名称
        String msg = "";
        try
        {
            msg = new String(message.getBody(), StandardCharsets.UTF_8);//注意与发布消息编码一致,否则会乱码
            if (!StringUtils.isEmpty(msg)){
                if (Constants.REDIS_CHANNEL.endsWith(channel)) {
                    JSONObject jsonObject = JSON.parseObject(msg);
                    log.info("redis队列监听消息 ==> msg = {}", msg);
                    webSocketServer.sendSingleMessage(
                            jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString()
                            ,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString()
                    );
                }else{
                    //TODO 其他订阅的消息处理
                }
            }else{
                log.info("消息内容为空,不处理。");
            }
        }catch (Exception e) {
            log.error("处理消息异常:"+e.toString());
            e.printStackTrace();
        }
    }
}

添加到redis配置类里

/**
 * redis配置
 */
@Configuration
public class RedisConfig {
    private static final RedisSerializer<Object> SERIALIZER = createSerializer();

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        // 创建 RedisTemplate 对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。
        template.setConnectionFactory(factory);
        // 使用 String 序列化方式,序列化 KEY 。
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        // 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
        template.setValueSerializer(SERIALIZER);
        template.setHashValueSerializer(SERIALIZER);
        return template;
    }

    private static RedisSerializer<Object> createSerializer() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModules(new JavaTimeModule());
        // 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
        mapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
        return new GenericJackson2JsonRedisSerializer(mapper);
    }


    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisReceiver receiver) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 可以添加多个 messageListener,配置不同的交换机,订阅最新消息频道
        container.addMessageListener(receiver, new PatternTopic(Constants.REDIS_CHANNEL));
        return container;
    }

}

常量类

/**
 * @description: 常量类
 */
public class Constants {

    /** redis 订阅消息通道标识*/
    public final static String REDIS_CHANNEL = "REDIS_CHANNEL";
    /** 消息体的key*/
    public final static String REDIS_MESSAGE_KEY = "KEY";
    /** 消息体的值*/
    public final static String REDIS_MESSAGE_VALUE = "VALUE";

}

搭建websocket客户端

新建一个springboot项目,放到 static 目录下面。页面简单使用js代码调用WebSocket

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- SpringBoot Actuator -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
server:
  port: ${SERVER_PORT:9208}

# Spring
spring:
  application:
    # 应用名称
    name: ruoyi-websocket-client
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <title>websocket客户端</title>
</head>
<body>
<div style="margin-bottom: 15px;">
    <label>连接sid:</label>
    <input type="text" id="sid" title="只能输入英文和数字" required>
    <button type="button" id="connectBtn" onclick="initConnection()">建立连接</button>
</div>
<p>【toSid】:
<div><input id="toSid" name="toSid" type="text" value="user-1"></div>
<p>【contentText】:
<div><input id="contentText"  type="text" value="hello websocket"></div>
<p>【操作】:
<div>
    <button type="button" onclick="sendMessage()">发送消息</button>
</div>
</body>

<script type="text/javascript">
    let socket;

    function sendMessage() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            var toSid = document.getElementById('toSid').value;
            var contentText = document.getElementById('contentText').value;
            var msg = '{"sid":"' + toSid + '","message":"' + contentText + '"}';
            console.log(msg);
            socket.send(msg);
        }
    }

    function initConnection() {
        const sid = document.getElementById('sid').value;
        // 原有WebSocket初始化逻辑
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            var reqUrl = "http://localhost:9206/websocket/" + sid;
            socket = new WebSocket(reqUrl.replace("http", "ws"));

            //打开事件
            socket.onopen = function () {
                console.log("Socket 已打开");
                const connectBtn = document.getElementById('connectBtn');
                connectBtn.disabled = true;
                connectBtn.textContent = "已连接";
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function (msg) {
                console.log("onmessage--" + msg.data);
                //发现消息进入    开始处理前端触发逻辑
            };
            //关闭事件
            socket.onclose = function () {
                console.log("Socket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                alert("Socket发生了错误");
                //此时可以尝试刷新页面
            }
        }
    }
</script>
</html>


单机发送

我们在客户端创建两个页面

两个页面都向9206的服务端注册,相当于服务端只有一个

1、前端向后端建立连接

打开两个页面,填写一个sid用于当前身份的标识,然后点击建立连接,这里会建立一个socket连接通道,然后访问后端地址

2、后端onOpen方法接受连接

后端获得请求后,会进入onOpen然后在给前端发送一个消息

3、前端收到消息

前端如果收到后端的信息,会进入onmessage方法

至此完成了websocket前后端通信过程 

4、前端给后端发送消息

AAA给BBB发送给消息

后端收到消息,在给前端发送消息,这里进入单机方法,然后从当前服务端获取session

单体版局限

在集群环境下(多个WS节点部署),当前配置会存在以下问题:

1‌、连接状态隔离问题
每个节点的onlineSessionClientMap独立存储Session,导致:

  • 用户A连接到节点1,其Session只存储在节点1
  • 用户B连接到节点2,其Session只存储在节点2
  • 当A发送消息给B时,A在节点1无法找到B的Session,导致推送失败

‌2、消息路由失效
sendToOne(sid, message)方法仅能向‌当前节点‌连接的客户端发送消息,无法跨节点通信,需引入中间件实现节点间消息同步:


分布式发送

切换到分布式发送方法

这里会往redis队列发送消息,然后redis监听方法会监听队列,然后往往每隔服务端都发送消息,每个服务端检查当前是否有该节点的session,如果有则发送给对应的客户端

创建两个服务节点,端口9206和9207

客户端开启两个页面分别连接9206和9207

建立连接

AAA注册节点9206,BBB注册节点9207

然后AAA往BBB发送消息

BBB成功接收到信息

Logo

更多推荐