websocket存储session(适合聊天,转发,广播,服务器主动发送消息)
此篇文章主要分享的是websocket存储session信息,在网上经常看到有websocket的例子,此篇文章只是将websocket中的session提取并存储起来,便于用户之间的互动与查找,其实就是解耦合
此实现采用的是springboot的spring封装的websocket框架,原有的tomcat其实也可以完成
首先,spring框架封装的websocket有几种对应
onopen对应afterConnectionEstablished
onclose对应afterConnectionClosed
onMessage对应handleTextMessage
以下是处理消息类
@Component
public class DoubleTeachWSHandler extends TextWebSocketHandler {
private static final Logger LOG = LoggerFactory
.getLogger(DoubleTeachWSHandler.class);
/**
* 创建连接
*
* @param session 用户session
* @throws Exception
* @version v1.0
* @autho 张瑞
* @date 2017年12月18日 下午3:48:34
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
System.out.println("afterConnectionEstablished");
try {
//获取用户信息
SocketUser socketUser = DoubleTeachUserService
.getSocketUser(session);
if (null == socketUser) {
return;
}
//加入redis
DoubleTeachUserService.addRedis(socketUser);
//创建房间并加入session
DoubleTeachSocketRoom.creatRoom(session, socketUser);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
/**
* 关闭链接
*
* @param session 用户session
* @param status 状态
* @throws Exception
* @version v1.0
* @autho 张瑞
* @date 2017年12月18日 下午3:49:04
*/
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus status) {
System.out.println("afterConnectionClosed");
try {
//获取用户信息
SocketUser socketUser = DoubleTeachUserService
.getSocketUser(session);
if (null == socketUser) {
return;
}
//删除缓存
DoubleTeachUserService.deleteRedis(socketUser);
//退出房间
DoubleTeachSocketRoom.removeUserSession(socketUser);
//是否删除房间
DoubleTeachSocketRoom.deleteRoom(socketUser.getRoomID());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
/**
* 处理消息
*
* @param session 用户session
* @param message message消息
* @throws Exception
* @version v1.0
* @autho 张瑞
* @date 2017年12月18日 下午3:55:18
*/
@Override
protected void handleTextMessage(WebSocketSession session,
TextMessage message) {
try {
//获取用户信息
SocketUser socketUser = DoubleTeachUserService
.getSocketUser(session);
if (null == socketUser) {
return;
}
String str = message.getPayload();
if (StringUtils.isNull(str)) {
//空消息不做操作
return;
}
//进行解析
SocketMessage socketMessage = JSONUtils.parse2Bean(str,
SocketMessage.class);
DoubleTeachSendMsg.mainSendMessage(socketMessage,socketUser);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
此处是重点,将session统一存储到此类中,用map存储起来,并自定义key值,value即存储websocket使用的session
@Component
public class DoubleTeachSocketRoom {
private static final Logger LOG = LoggerFactory
.getLogger(DoubleTeachSocketRoom.class);
//整体管理room的map
private static ConcurrentHashMap<String, SocketRoom> roomMap = new ConcurrentHashMap<String, SocketRoom>(
10000);
/**
* 获取用户session
*
* @param socketUser 用户实体
* @return 用户session
* @version v1.0
* @author 张瑞
* @date 2018年1月24日 上午10:51:11
*/
public static WebSocketSession getUserSession(SocketUser socketUser) {
try {
if (null == socketUser) {
return null;
}
SocketRoom socketRoom = roomMap.get(socketUser.getRoomID());
if (null == socketRoom) {
return null;
}
WebSocketSession session = socketRoom.getSessionMap()
.get(socketUser.getSessionID());
return session;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
}
/**
* 移除用户session
*
* @param socketUser 用户实体
* @version v1.0
* @author 张瑞
* @date 2018年1月24日 上午10:51:40
*/
public static void removeUserSession(SocketUser socketUser) {
try {
if (null == socketUser) {
return;
}
SocketRoom socketRoom = roomMap.get(socketUser.getRoomID());
if (null == socketRoom) {
//不需要做操作
return;
}
socketRoom.getSessionMap().remove(socketUser.getSessionID());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
/**
* 获取当前房间用户总体
*
* @param roomID 房间ID
* @return 房间实体
* @version v1.0
* @author 张瑞
* @date 2018年1月24日 上午10:52:24
*/
public static SocketRoom getSocketRoom(String roomID) {
try {
if (StringUtils.isNull(roomID)) {
return null;
}
SocketRoom socketRoom = roomMap.get(roomID);
return socketRoom;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return null;
}
}
/**
* 添加用户进入房间并是否创建房间
*
* @param session 用户session
* @param socketUser 用户
* @version v1.0
* @author 张瑞
* @date 2017年12月19日 下午3:44:27
*/
public static synchronized void creatRoom(WebSocketSession session,
SocketUser socketUser) {
SocketRoom socketRoom = roomMap.get(socketUser.getRoomID());
if (null == socketRoom) {
socketRoom = new SocketRoom();
roomMap.put(socketUser.getRoomID(), socketRoom);
}
socketRoom.getSessionMap().put(session.getId(), session);
}
/**
* 删除房间
*
* @param roomID
* @version v1.0
* @author 张瑞
* @date 2017年12月18日 下午3:54:13
*/
public static synchronized void deleteRoom(String roomID) {
SocketRoom socketRoom = roomMap.get(roomID);
if (null == socketRoom) {
return;
}
if (socketRoom.getSessionMap().size() == 0) {
roomMap.remove(roomID);
}
}
}
聊天室专用的存储表
public class SocketRoom implements Serializable {
private ConcurrentHashMap<String, WebSocketSession> sessionMap;
public SocketRoom() {
sessionMap = new ConcurrentHashMap<String, WebSocketSession>(10000);
}
public SocketRoom(int size) {
sessionMap = new ConcurrentHashMap<String, WebSocketSession>(size);
}
public ConcurrentHashMap<String, WebSocketSession> getSessionMap() {
return sessionMap;
}
public void setSessionMap(
ConcurrentHashMap<String, WebSocketSession> sessionMap) {
this.sessionMap = sessionMap;
}
}
用户存储表
public class SocketUser implements Serializable {
private String token;//用户登录唯一验证
private String sessionID;//websocketID
private String roomID;//房间号
private String liveid;//直播号
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public String getSessionID() {
return sessionID;
}
public void setSessionID(String sessionID) {
this.sessionID = sessionID;
}
public String getRoomID() {
return roomID;
}
public void setRoomID(String roomID) {
this.roomID = roomID;
}
public String getLiveid() {
return liveid;
}
public void setLiveid(String liveid) {
this.liveid = liveid;
}
}
上述步骤主要将处理消息和session分离开,唯一要注意的是移动端的websocket在退出时服务端可能接收不到,一定要注意将session清空。否则session会越寸越大导致内存溢出。
附加一个发送消息类
@Component
public class DoubleTeachSendMsg {
private static final Logger LOG = LoggerFactory
.getLogger(DoubleTeachSendMsg.class);
/**
* 发送消息给用户端的人
*
* @param socketMessage 消息
* @param socketUser 用户
* @throws IOException
* @version v1.0
* @author 张瑞
* @date 2017年12月19日 下午5:15:56
*/
private static void sendToClient(SocketMessage socketMessage,
SocketUser socketUser)
throws IOException {
//获取当前端内用户
List<String> list = Jedis.lrange(
JedisKeys.userlist,
socketUser.getLiveid() + ":" + socketUser.getCnum() + ":"
+ socketMessage.getTo(),
null, null);
if (null == list || list.isEmpty()) {
return;
}
//添加返回的json格式的字符串
String jsonStr = JSONUtils.getJSON(socketMessage);
//TODO
LOG.error("to:socketMessage:" + jsonStr);
for (String key : list) {
//获取用户
SocketUser socketUserThis = JSONUtils.parse2Bean(Jedis
.get(JedisKeys.USERINFO, key),
SocketUser.class);
if (null == socketUserThis) {
continue;
}
WebSocketSession session = DoubleTeachSocketRoom
.getUserSession(socketUserThis);
if (null != session) {
Jedis.lpush(
JedisKeys.SENDMESSAGE_QUEUE,
session.getId(), jsonStr);
sendMessage(session);
}
}
}
/**
* 发送消息(每个用户有自己的发送消息锁)
*
* @param session
* @version v1.0
* @author 张瑞
* @date 2018年1月6日 下午1:55:40
*/
public static void sendMessage(WebSocketSession session) {
String msg = "";
try {
Long lock = Jedis.setnx(
JedisKeys.SENDMESSAGE_LOCK,
session.getId(), "1");
if (lock == 1) {
//循环发送消息
while (true) {
msg = Jedis.rpop(
JedisKeysEnum.SENDMESSAGE_QUEUE,
session.getId());
if (StringUtils.isNull(msg)) {
//删除锁
Jedis.delete(
JedisKeysEnum.SENDMESSAGE_LOCK,
session.getId());
return;
}
//判断链接是否存在
TextMessage textMessage = new TextMessage(msg);
session.sendMessage(textMessage);
//服务器可能判断isopen不准,简化步骤,不采用,遇到问题直接结束websocket链接
/*if (session.isOpen()) {
synchronized (session) {
session.sendMessage(new TextMessage(msg));
}
} else {
Jedis.delete(
JedisKeys.SENDMESSAGE_LOCK,
session.getId());
Jedis.delete(
JedisKeysEnum.SENDMESSAGE_QUEUE,
session.getId());
//调用结束逻辑
session.close();
}*/
}
}
} catch (Exception e) {
//由于客户端断开服务器监听不到,在客户端简历重连机制,发送失败即断开
//删除锁
Jedis.delete(JedisKeys.SENDMESSAGE_LOCK,
session.getId());
Jedis.delete(
JedisKeys.SENDMESSAGE_QUEUE,
session.getId());
LOG.error("error_msg_is:" + msg);
LOG.error(e.getMessage(), e);
try {
session.close();
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
}
}
}
}
此为分离websocket的session的实现,有些代码涉及到了业务,进行了删除,理解了整体思路即可。这样,独立出了session信息,在服务器层遇到接口只要能获取存储session的key值,便可发送消息,实现主动发送消息给客户端
————————————————
版权声明:本文为CSDN博主「ZERO_No1」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zero_no1/article/details/79290566