Springboot+websocket基于Redis订阅发布实现集群化

Springboot+websocket基于Redis订阅发布实现集群化

前面讲了单机版的websocket如何使用发送群聊(2020-03-24-springboot快速集成websocket实现群聊),那么要是部署多个服务实现集群话怎么实现呢?

由于websocket是长连接,session保持在一个server中,所以在不同server在使用websocket推送消息时就需要获取对应的session进行推送,在分布式系统中就无法获取到所有session,这里就需要使用一个中间件将消息推送到各个系统中,在这里使用的redis,使用redis的sub/pub功能。

实现步骤

  • Redis要配置消息监听容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.pconline.pcloud.admin.config;

import cn.pconline.pcloud.admin.mq.ChatMessageListener;
import cn.pconline.pcloud.base.util.RedisKey;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
* @Description 消息订阅配置类
* @Author jie.zhao
* @Date 2020/3/31 13:54
*/
@Configuration
public class RedisSubscriberConfig {
/**
* 消息监听适配器,注入接受消息方法
*
* @param receiver
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter(ChatMessageListener receiver) {
return new MessageListenerAdapter(receiver);
}

/**
* 创建消息监听容器
*
* @param redisConnectionFactory
* @param messageListenerAdapter2
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(RedisKey.REDIS_MQ_CHAT));
return redisMessageListenerContainer;
}
}
  • Redis消息处理类

收到消息后,给当前Server的session发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.pconline.pcloud.admin.mq;

import cn.pconline.framework.util.StringUtils;
import cn.pconline.pcloud.admin.ws.ChatWebsocketEndpoint;
import cn.pconline.pcloud.base.dto.MessageDto;
import cn.pconline.pcloud.base.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;

/**
* @Description 集群聊天消息监听器
* @Author jie.zhao
* @Date 2020/3/29 15:07
*/
@Component
public class ChatMessageListener implements MessageListener {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private ChatWebsocketEndpoint websocketEndpoint;

@Override
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
String value = valueSerializer.deserialize(message.getBody());

if (StringUtils.isNotBlank(value)) {
MessageDto dto = JsonUtils.jsonToPojo(value, MessageDto.class);
logger.info("监听集群websocket消息--- {}", value);
//集群模式 推送消息
websocketEndpoint.sendClusterWebsocketMessage(dto);
}
}

}
  • websoketEndpoint修改

原先直接发消息的地方,改为发送Rdis订阅消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package cn.pconline.pcloud.admin.ws;

import cn.pconline.pcloud.base.dto.MessageDto;
import cn.pconline.pcloud.base.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/**
* @Description 群聊websocket改造为集群模式
* @Author jie.zhao
* @Date 2020/3/29 15:07
*/
@Component
@ServerEndpoint("/groupChat/{groupNo}/{uuid}/{name}")
public class ChatWebsocketEndpoint {

private Logger logger = LoggerFactory.getLogger(getClass());

private StringRedisTemplate stringRedisTemplate = SpringBeanUtils.getBean(StringRedisTemplate.class);

private RedisTemplateUtil redisTemplateUtil = SpringBeanUtils.getBean(RedisTemplateUtil.class);

/**
* 保存 组id->组成员 的映射关系
* 之所以使用ConcurrentHashMap因为这个是线程安全的
*/
private static ConcurrentHashMap<String, List<Session>> groupMemberInfoMap = new ConcurrentHashMap<>();

/**
* 收到消息调用的方法,群成员发送消息
*
* @param groupNo
* @param uuid
* @param message
*/
@OnMessage
public void onMessage(Session session, @PathParam("groupNo") String groupNo, @PathParam("uuid") String uuid,
@PathParam("name") String name, String message) {
//前端心跳检测
if("PING".equals(message)){
try {
session.getBasicRemote().sendText("PING");
} catch (Exception e) {
logger.info("会话异常!" + e.getMessage());
e.printStackTrace();
}
}else{
//message过滤html、script、css标签
message = HtmlUtil.htmlEncode(message);
MessageDto messageDto = new MessageDto(groupNo, MessageDto.MessageTypeEnum.CHAT, uuid, name, message, DateUtil.format(new Date()));
this.sendClusterMessage(messageDto);
this.writeRedisChatList(messageDto);
}
}

/**
* 建立连接调用的方法,群成员加入
*
* @param session 会话
* @param groupNo 群id
*/
@OnOpen
public void onOpen(Session session, @PathParam("groupNo") String groupNo, @PathParam("uuid") String uuid, @PathParam("name") String name) {
List<Session> sessionList = groupMemberInfoMap.get(groupNo);
if (sessionList == null) {
sessionList = new ArrayList<>();
groupMemberInfoMap.put(groupNo, sessionList);
}
sessionList.add(session);
this.addClusterSessionCount(groupNo, uuid);
long onlineNum = this.getClusterSessionCount(groupNo);
MessageDto messageDto = new MessageDto(groupNo, MessageDto.MessageTypeEnum.SYSTEM, uuid, name, "连接建立", DateUtil.format(new Date()), onlineNum);
this.sendClusterMessage(messageDto);
logger.info("连接建立");
logger.info("直播房间号: {},当前集群在线人数:{} ", groupNo, onlineNum);
}

/**
* 关闭连接调用的方法,群成员退出
*
* @param session
* @param groupNo
*/
@OnClose
public void onClose(Session session, @PathParam("groupNo") String groupNo, @PathParam("uuid") String uuid, @PathParam("name") String name) {
List<Session> sessionList = groupMemberInfoMap.get(groupNo);
sessionList.remove(session);
this.removeClusterSessionCount(groupNo, uuid);
long onlineNum = this.getClusterSessionCount(groupNo);
MessageDto messageDto = new MessageDto(groupNo, MessageDto.MessageTypeEnum.SYSTEM, uuid, name, "连接关闭", DateUtil.format(new Date()), onlineNum);
this.sendClusterMessage(messageDto);
logger.info("连接关闭");
logger.info("直播房间号: {},当前集群在线人数:{} ", groupNo, onlineNum);
}

/**
* 传输消息错误调用的方法
*
* @param error
*/
@OnError
public void OnError(Throwable error) {
logger.info("连接出错:{}", error.getMessage());
}

/**
* 发布集群的消息
*
* @param dto
*/
private void sendClusterMessage(MessageDto dto) {
stringRedisTemplate.convertAndSend(RedisKey.REDIS_MQ_CHAT, JsonUtils.objectToJson(dto));
}

/**
* 添加集群的在线人数
*
* @param groupNo
* @param uuid
* @return
*/
private void addClusterSessionCount(String groupNo, String uuid) {
redisTemplateUtil.sSet(RedisKey.REDIS_MQ_CHAT_SESSION_ID + groupNo, uuid);
}

/**
* 移除集群的在线人数
*
* @param groupNo
* @param uuid
* @return
*/
private void removeClusterSessionCount(String groupNo, String uuid) {
redisTemplateUtil.setRemove(RedisKey.REDIS_MQ_CHAT_SESSION_ID + groupNo, uuid);
}

/**
* 获取集群的在线人数
*
* @param groupNo
* @return
*/
private long getClusterSessionCount(String groupNo) {
return redisTemplateUtil.sGetSetSize(RedisKey.REDIS_MQ_CHAT_SESSION_ID + groupNo);
}

/**
* 将聊天内容写入redis,定时同步mysql
*
* @param dto
* @return
*/
private void writeRedisChatList(MessageDto dto) {
if (dto != null) {
redisTemplateUtil.lSet(RedisKey.REDIS_CHAT_LIST + dto.getGroupNo(), JsonUtils.objectToJson(dto));
//双写模式 同时写入历史消息
redisTemplateUtil.lSet(RedisKey.REDIS_CHAT_LIST_HISTORY + dto.getGroupNo(), JsonUtils.objectToJson(dto));
}
}

/**
* 发布websocket消息
*
* @param dto
* @return
*/
public void sendClusterWebsocketMessage(MessageDto dto) {
if (dto != null) {
//得到当前群的所有会话,也就是所有用户
List<Session> sessionList = groupMemberInfoMap.get(dto.getGroupNo());

if (sessionList != null && sessionList.size() > 0) {
// 遍历Session集合给每个会话发送文本消息
sessionList.forEach(item -> {
try {
item.getBasicRemote().sendText(JsonUtils.objectToJson(dto));
} catch (Exception e) {
logger.info("会话异常!" + e.getMessage());
e.printStackTrace();
}
});
}
}
}
}

至此,websocket基于Redis订阅发布实现集群化改造完成。

-------------已经触及底线 感谢您的阅读-------------

本文标题:Springboot+websocket基于Redis订阅发布实现集群化

文章作者:趙小傑~~

发布时间:2020年04月05日 - 16:54:00

最后更新:2020年04月06日 - 01:47:18

原始链接:https://cnsyear.com/posts/5de5f2c0.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%