SpringBoot集成WebSocket(支持分布式)
轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。访问前端页面,填写一个cid用于当前身份的标识,然后点击建立连接,这里会建立一个socket连接通道,然后访问后端地址。后端获得请求后,会进入onOpen然后调用sendToOne方法,给前端发送一个消
参考
SpringBoot 集成WebSocket详解_springboot websocket-CSDN博客
为什么需要WebSocket
HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接。
- 无状态:每次连接只处理一个请求,请求结束后断开连接。
- 无连接:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。
通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。
WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。
WebSocket特点
- 建立在 TCP 协议之上,服务器端的实现比较容易。
- 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
搭建websocket服务端
创建SpringBoot项目,引入 WebSocket依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- SpringBoot Boot Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
application.yml
server:
port: ${SERVER_PORT:9206}
# Spring
spring:
application:
# 应用名称
name: ruoyi-websocket
redis:
host: localhost
port: 6379
password: 123456
配置类WebSocketConfig
启用 WebSocket的支持也是很简单
/**
* WebSocket配置类。开启WebSocket的支持
*/
@Configuration
public class WebSocketConfig {
/**
* bean注册:会自动扫描带有@ServerEndpoint注解声明的Websocket Endpoint(端点),注册成为Websocket bean。
* 要注意,如果项目使用外置的servlet容器,而不是直接使用springboot内置容器的话,就不要注入ServerEndpointExporter,
* 因为它将由容器自己提供和管理。
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
新增WebSocketServer
这里就是重点了,核心都在这里
- 因为WebSocket是类似客户端服务端的形式(
采用ws协议
),那么这里的WebSocketServer其实就相当于一个ws协议的Controller 直接@ServerEndpoint("/websocket/{sid}
") 、@Component启用即可
,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。- 新建一个
ConcurrentHashMap用于接收当前userId的WebSocket或者Session信息
,方便IM之间对userId进行推送消息。 - 集群版(多个ws节点)还需要借助 MySQL或者 Redis等进行订阅广播方式处理,改造对应的 sendMessage方法即可。
- 前端html页面与之关联的接口
- var reqUrl = "http://ip:port/websocket/" + cid;
- socket = new WebSocket(reqUrl.replace("http", "ws"));
/**
* WebSocket的操作类
* html页面与之关联的接口
* var reqUrl = "http://ip:port/websocket/" + sid;
* socket = new WebSocket(reqUrl.replace("http", "ws"));
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{sid}")
public class WebSocketServer {
//存放所有在线的客户端
private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
//连接sid和连接会话
private String sid;
private Session session;
/**
* 连接建立成功调用的方法。
* 由前端<code>new WebSocket</code>触发
*
* @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息
* @param sid 每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。
*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid) {
webSocketMap.put(sid, this);
this.sid = sid;
this.session = session;
sendSingleMessage(sid, "连接成功,sid="+sid);
log.info("连接建立成功,当前在线数为:{} ==> 开始监听新连接:sessionId = {}, sid = {}。", webSocketMap.size(), session.getId(), sid);
}
/**
* 连接关闭调用的方法。
* 由前端<code>socket.close()</code>触发
*
* @param session
* @param sid
*/
@OnClose
public void onClose(Session session,@PathParam("sid") String sid) {
// 从 Map中移除
webSocketMap.remove(sid);
log.info("连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:sessionId = {}, sid = {}。", webSocketMap.size(), session.getId(), sid);
}
/**
* 发生错误调用的方法
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket发生错误,错误信息为:" + error.getMessage());
error.printStackTrace();
}
/**
* 收到客户端消息后调用的方法。
* 由前端<code>socket.send</code>触发
* html界面传递来得数据格式,可以自定义.{"sid":"user-1","message":"hello websocket"}
*
* @param message
* @param session
*/
@OnMessage
public void onMessage(String message, Session session) {
JSONObject jsonObject = JSON.parseObject(message);
String toSid = jsonObject.getString("sid");
String msg = jsonObject.getString("message");
log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
//模拟约定:如果未指定sid信息,则群发,否则就单独发送
if (toSid == null || toSid.equals("") || "".equalsIgnoreCase(toSid)) {
sendToAll(msg);
} else {
sendToOne(toSid, msg);
}
}
//群发消息
private void sendToAll(String message) {
// 遍历在线map集合
webSocketMap.forEach((toSid, webSocketServer) -> {
webSocketServer.sendToOne(toSid,message);
});
}
//指定发送消息
private void sendToOne(String toSid, String message){
// 排除掉自己
if (sid.equalsIgnoreCase(toSid)) {
log.info("不能给自己发消息 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
return;
}
sendMessage(toSid,message); //分布式
// sendSingleMessage(toSid,message); //单机
}
//分布式使用 服务端给redis发送消息,需要指定channel
public void sendMessage(@NotNull String toSid,String message){
try {
String newMessage = new String(message.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8);
Map<String,String> map = new HashMap<>();
map.put(Constants.REDIS_MESSAGE_KEY, toSid);
map.put(Constants.REDIS_MESSAGE_VALUE, newMessage);
log.info("服务端给redis队列发消息 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
StringRedisTemplate template = SpringUtils.getBean(StringRedisTemplate.class);
template.convertAndSend(Constants.REDIS_CHANNEL, JSON.toJSONString(map));
}catch (Exception e){
e.printStackTrace();
log.info("服务端给redis队列发消息失败 ==> sid = {}, toSid = {}, message = {}", sid, toSid, message);
}
}
//单机使用 通过指定的客户id向该客户推送消息
//当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听
public void sendSingleMessage(@NotNull String toSid, String message) {
WebSocketServer webSocketServer = webSocketMap.get(toSid);
if (!StringUtils.isNull(webSocketServer)) {
try {
webSocketServer.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
配置redis进行监听
配置监听器
/**
* 消息监听器,接收订阅消息
*/
@Component
public class RedisReceiver implements MessageListener {
Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private WebSocketServer webSocketServer;
/**
* 监听redis的channel信息
* 这里只对channel=REDIS_CHANNEL的信息进行处理
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel());// 订阅的频道名称
String msg = "";
try
{
msg = new String(message.getBody(), StandardCharsets.UTF_8);//注意与发布消息编码一致,否则会乱码
if (!StringUtils.isEmpty(msg)){
if (Constants.REDIS_CHANNEL.endsWith(channel)) {
JSONObject jsonObject = JSON.parseObject(msg);
log.info("redis队列监听消息 ==> msg = {}", msg);
webSocketServer.sendSingleMessage(
jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString()
,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString()
);
}else{
//TODO 其他订阅的消息处理
}
}else{
log.info("消息内容为空,不处理。");
}
}catch (Exception e) {
log.error("处理消息异常:"+e.toString());
e.printStackTrace();
}
}
}
添加到redis配置类里
/**
* redis配置
*/
@Configuration
public class RedisConfig {
private static final RedisSerializer<Object> SERIALIZER = createSerializer();
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
// 创建 RedisTemplate 对象
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置 RedisConnection 工厂。😈 它就是实现多种 Java Redis 客户端接入的秘密工厂。感兴趣的胖友,可以自己去撸下。
template.setConnectionFactory(factory);
// 使用 String 序列化方式,序列化 KEY 。
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
template.setValueSerializer(SERIALIZER);
template.setHashValueSerializer(SERIALIZER);
return template;
}
private static RedisSerializer<Object> createSerializer() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModules(new JavaTimeModule());
// 此项必须配置,否则会报java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
mapper.activateDefaultTyping(mapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
return new GenericJackson2JsonRedisSerializer(mapper);
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory factory, RedisReceiver receiver) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 可以添加多个 messageListener,配置不同的交换机,订阅最新消息频道
container.addMessageListener(receiver, new PatternTopic(Constants.REDIS_CHANNEL));
return container;
}
}
常量类
/**
* @description: 常量类
*/
public class Constants {
/** redis 订阅消息通道标识*/
public final static String REDIS_CHANNEL = "REDIS_CHANNEL";
/** 消息体的key*/
public final static String REDIS_MESSAGE_KEY = "KEY";
/** 消息体的值*/
public final static String REDIS_MESSAGE_VALUE = "VALUE";
}
搭建websocket客户端
新建一个springboot项目,放到 static 目录下面。页面简单使用js代码调用WebSocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
server:
port: ${SERVER_PORT:9208}
# Spring
spring:
application:
# 应用名称
name: ruoyi-websocket-client
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>websocket客户端</title>
</head>
<body>
<div style="margin-bottom: 15px;">
<label>连接sid:</label>
<input type="text" id="sid" title="只能输入英文和数字" required>
<button type="button" id="connectBtn" onclick="initConnection()">建立连接</button>
</div>
<p>【toSid】:
<div><input id="toSid" name="toSid" type="text" value="user-1"></div>
<p>【contentText】:
<div><input id="contentText" type="text" value="hello websocket"></div>
<p>【操作】:
<div>
<button type="button" onclick="sendMessage()">发送消息</button>
</div>
</body>
<script type="text/javascript">
let socket;
function sendMessage() {
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
var toSid = document.getElementById('toSid').value;
var contentText = document.getElementById('contentText').value;
var msg = '{"sid":"' + toSid + '","message":"' + contentText + '"}';
console.log(msg);
socket.send(msg);
}
}
function initConnection() {
const sid = document.getElementById('sid').value;
// 原有WebSocket初始化逻辑
if (typeof (WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
var reqUrl = "http://localhost:9206/websocket/" + sid;
socket = new WebSocket(reqUrl.replace("http", "ws"));
//打开事件
socket.onopen = function () {
console.log("Socket 已打开");
const connectBtn = document.getElementById('connectBtn');
connectBtn.disabled = true;
connectBtn.textContent = "已连接";
//socket.send("这是来自客户端的消息" + location.href + new Date());
};
//获得消息事件
socket.onmessage = function (msg) {
console.log("onmessage--" + msg.data);
//发现消息进入 开始处理前端触发逻辑
};
//关闭事件
socket.onclose = function () {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function () {
alert("Socket发生了错误");
//此时可以尝试刷新页面
}
}
}
</script>
</html>
单机发送
我们在客户端创建两个页面
两个页面都向9206的服务端注册,相当于服务端只有一个
1、前端向后端建立连接
打开两个页面,填写一个sid用于当前身份的标识,然后点击建立连接,这里会建立一个socket连接通道,然后访问后端地址
2、后端onOpen方法接受连接
后端获得请求后,会进入onOpen然后在给前端发送一个消息
3、前端收到消息
前端如果收到后端的信息,会进入onmessage方法
至此完成了websocket前后端通信过程
4、前端给后端发送消息
AAA给BBB发送给消息
后端收到消息,在给前端发送消息,这里进入单机方法,然后从当前服务端获取session
单体版局限
在集群环境下(多个WS节点部署),当前配置会存在以下问题:
1、连接状态隔离问题
每个节点的onlineSessionClientMap
独立存储Session,导致:
- 用户A连接到节点1,其Session只存储在节点1
- 用户B连接到节点2,其Session只存储在节点2
- 当A发送消息给B时,A在节点1无法找到B的Session,导致推送失败
2、消息路由失效sendToOne(sid, message)
方法仅能向当前节点连接的客户端发送消息,无法跨节点通信,需引入中间件实现节点间消息同步:
分布式发送
切换到分布式发送方法
这里会往redis队列发送消息,然后redis监听方法会监听队列,然后往往每隔服务端都发送消息,每个服务端检查当前是否有该节点的session,如果有则发送给对应的客户端
创建两个服务节点,端口9206和9207
客户端开启两个页面分别连接9206和9207
建立连接
AAA注册节点9206,BBB注册节点9207
然后AAA往BBB发送消息
BBB成功接收到信息
更多推荐
所有评论(0)