分布式系统登录优化实战
发表于更新于
字数总计:2.3k阅读时长:11分钟 上海
后端后端分布式系统登录优化实战
xukun一、项目背景
在对现有TSP系统的Redis进行性能优化时,发现当前分布式系统中,用户登录态通过 Redis 来管理:以键值对形式存储用户的 Token 与用户信息(UserDto),并结合 WebSocket、SSE、Kafka 组件,实现实时通信、消息推送及统一认证(SSO)。
随着业务的发展,发现原有的存储设计存在以下问题:
- Key 命名零散、扫描效率低
- 多处基于通配符 (
KEYS STOKENS_{userId}_\*
) 的扫描逻辑冗余、性能不佳
- WebSocket 端无根据 Token 快速反查用户 ID的逻辑,通过scans进行全量查询
- 接口逻辑侵入了大量“查旧 Key、删旧 Key”的代码,且可维护性差
- 随着redis数据量的增多,执行scans需要耗费的时间逐步递增,使用体验差

二、改造思路
将 Redis 存储从多 string Key改造为Hash 结构,并在登录、登出、校验、WebSocket 握手、SSE 推送、Kafka 同步等流程中,做出对应改造,彻底简化业务逻辑、提升性能与可维护性
1.Redis Key 结构升级
- 从
String
→ Hash
,方便聚合和局部读写
- 设计双向映射:
HSET STOKENS_{userId} {token} UserDto
SET STOKENS_{token} {userId}
2.SESSION 管理
- 登录、登出及 WebSocket 握手,都写入/删除
SESSION_{userId}
中对应 field
3.简化“重复登录”检查逻辑
- 只需
HKEYS STOKENS_{userId}
得到当前所有 Token
- 新旧 Token 比对,决定是否踢出、确认或拒绝登录
4.最小化改动范围
- 在原有的
saveObjectToHash()
方法,(保存 STOKENS_{userId}
)逻辑后,写入 SET STOKENS_{token}
- 所有涉及登录态的接口(
/login
、/logout
、/validateToken
、/user/resetPassWord
、/user/editUser
、SSE 推送、Kafka 同步等)逐一替换原有的 String 操作为 Hash 操作
三、关键部分改造示例
1.登录接口防重复登录校验
改造前:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
private boolean isSaveTokenToredis(UserInfoDto dto, String confirmLoginFlg, HttpCommandResultWithData result) throws JsonProcessingException { String token = StringUtil.getUUID(); dto.setToken(token); String keys = Constants.TOKEN_KEY + dto.getUserId() + Constants.SYMBO_UNDERLINE; String sessionKeys = Constants.SESSION_KEY + dto.getUserId() + Constants.SYMBO_UNDERLINE; if (!checkConfirmLogin(keys + Constants.SYMBO_NUMBERSIGN, sessionKeys + Constants.SYMBO_NUMBERSIGN, confirmLoginFlg, result)) { return false; } redisService.saveObjectToJson(keys + token, dto, tokenLiveTime); return true; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| private boolean checkConfirmLogin(String keys, String sessionKeys, String confirmLoginFlg, HttpCommandResultWithData result) { boolean isConfirmLogin = true; Set<String> tb_set = redisService.getKeys(keys); Set<String> session_set = redisService.getKeys(sessionKeys); if (!StringUtil.isEmpty(session_set)) { if (!StringUtil.isEmpty(tb_set)) { for (String key : tb_set) { UserInfoDto dto = redisService.getJson(key, UserInfoDto.class); LogOutCommand command = new LogOutCommand(); if (dto != null) { String token = dto.getToken(); command.setToken(token); if (StringUtil.isEq(Constants.ConfirmLoginFlgEnum.CONFIRM_LOGIN.getCode(), confirmLoginFlg)) { this.logout(command); this.pushMessage(token); isConfirmLogin = true; continue; } else { result.fillResult(ReturnCode.CONFIRM_LOGIN); isConfirmLogin = false; break; } } else { this.logout(command); isConfirmLogin = true; } } } } return isConfirmLogin; }
|
改造后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
private boolean isSaveTokenToredis(UserInfoDto dto, String confirmLoginFlg, HttpCommandResultWithData result) throws JsonProcessingException { String keys = Constants.TOKEN_KEY; String sessionKeys = Constants.SESSION_KEY; if (!checkConfirmLogin(keys + dto.getUserId(), sessionKeys + dto.getUserId(), confirmLoginFlg, result)) { return false; } String token = StringUtil.getUUID(); dto.setToken(token); redisServiceNew.saveObjectToHash(keys + dto.getUserId(), token, dto, tokenLiveTime); redisService.saveObjectToJson(keys + token, dto.getUserId()); return true; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
private boolean checkConfirmLogin(String keys, String sessionKeys, String confirmLoginFlg, HttpCommandResultWithData result) { String session = redisService.getValueByKey(sessionKeys); if (session == null) { return true; } UserInfoDto dto = redisService.getJson(keys, session, UserInfoDto.class); if (dto == null) { return true; } if (Constants.ConfirmLoginFlgEnum.CONFIRM_LOGIN.getCode().equals(confirmLoginFlg)) { LogOutCommand cmd = new LogOutCommand(); cmd.setToken(dto.getToken()); cmd.setUserInfor(dto); this.logout(cmd); this.pushMessage(dto.getToken()); return true; } result.fillResult(ReturnCode.CONFIRM_LOGIN); return false; }
|
2.WebSocket逻辑改造
改造前:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
| @ServerEndpoint(value = "/push/pushSocket") @Component @Scope("prototype") public class PushSocket { IPushWatchedService pushWatchedService;
private static Log logger = LogFactory.getLog(PushSocket.class);
public PushSocket() { this.pushWatchedService = (IPushWatchedService) Application.getApplicationContext().getBean("pushWatchedService"); }
@OnOpen public void open(Session session) { try { String token = session.getQueryString(); if (StringUtil.isNotEmpty(token)) { token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; if (StringUtil.isNotEmpty(token)) { WebSocketContainer.register(token, session); pushWatchedService.addPushSocket(session, this); String userID = pushWatchedService.findUserIDByTokenKey(token); logger.info("redis保存与session绑定的key:" + Constants.SESSION_KEY + userID +Constants.SYMBO_UNDERLINE + token); pushWatchedService.addSessionKey(token, userID); logger.debug("与客户端建立连接成功,连接token is : " + token); } } } catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnMessage public void onMessage(Session session, String text) { try { session.getBasicRemote().sendText(text); } catch (Exception e) { logger.error(e.getMessage(), e); } }
public void sendMessage(Session session, String text) { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("向客户端发送消息,连接token is : " + token + "发送的消息为:" + text); try { session.getBasicRemote().sendText(text); } catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnClose public void close(Session session) { try { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; logger.debug("关闭的连接token is : " + token);
String userID = pushWatchedService.findUserIDByTokenKey(token); logger.info("redis删除key:" + Constants.SESSION_KEY + userID + Constants.SYMBO_UNDERLINE + token);
pushWatchedService.findAndDelKey(token, userID); WebSocketContainer.destory(session);
pushWatchedService.removePushSocket(session, this);
} catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnError public void onError(Throwable e, Session session) { try { if (session != null) { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; logger.debug("错误时关闭的连接token is : " + token);
session.close();
String userID = pushWatchedService.findUserIDByTokenKey(token); logger.info("redis删除key:" + Constants.SESSION_KEY + userID + Constants.SYMBO_UNDERLINE + token); pushWatchedService.findAndDelKey(token, userID); WebSocketContainer.destory(session); pushWatchedService.removePushSocket(session, this); } } catch (Exception e1) { logger.error(e.getMessage(), e); } } }
|
改造后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| @ServerEndpoint(value = "/push/pushSocket") @Component @Scope("prototype") public class PushSocket { IPushWatchedService pushWatchedService;
private static Log logger = LogFactory.getLog(PushSocket.class);
public PushSocket() { this.pushWatchedService = (IPushWatchedService) Application.getApplicationContext().getBean("pushWatchedService"); }
@OnOpen public void open(Session session) { try { String token = session.getQueryString(); if (StringUtil.isNotEmpty(token)) { token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; if (StringUtil.isNotEmpty(token)) { WebSocketContainer.register(token, session); pushWatchedService.addPushSocket(session, this); String userID = pushWatchedService.findUserIDByTokenKey(token); pushWatchedService.addSessionKey(token, userID); logger.debug("与客户端建立连接成功,连接token is : " + token); } } } catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnMessage public void onMessage(Session session, String text) { try { session.getBasicRemote().sendText(text); } catch (Exception e) { logger.error(e.getMessage(), e); } }
public void sendMessage(Session session, String text) { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("向客户端发送消息,连接token is : " + token + "发送的消息为:" + text); try { session.getBasicRemote().sendText(text); } catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnClose public void close(Session session) { try { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; logger.debug("关闭的连接token is : " + token);
String userID = pushWatchedService.findUserIDByTokenKey(token); pushWatchedService.findAndDelKey(Constants.SESSION_KEY + userID); pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + token); pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + userID, token); logger.info("redis删除key:" + Constants.SESSION_KEY + userID); logger.info("redis删除key:" + Constants.TOKEN_KEY + token); logger.info("redis删除key:" + Constants.TOKEN_KEY + userID+ "-" + token); WebSocketContainer.destory(session);
pushWatchedService.removePushSocket(session, this);
} catch (Exception e) { logger.error(e.getMessage(), e); } }
@OnError public void onError(Throwable e, Session session) { try { if (session != null) { String token = session.getQueryString(); token = token.indexOf("=") == -1 ? "" : token.split("=")[1]; logger.debug("错误时关闭的连接token is : " + token);
session.close();
String userID = pushWatchedService.findUserIDByTokenKey(token); pushWatchedService.findAndDelKey(Constants.SESSION_KEY + userID); pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + token); pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + userID, token); logger.info("redis删除key:" + Constants.SESSION_KEY + userID); logger.info("redis删除key:" + Constants.TOKEN_KEY + token); logger.info("redis删除key:" + Constants.TOKEN_KEY + userID+ "-" + token); WebSocketContainer.destory(session); pushWatchedService.removePushSocket(session, this); } } catch (Exception e1) { logger.error(e.getMessage(), e); } } }
|
3.Redis存入结构改造
改造前:

改造后:

4.握手拦截器获取 UserId
1 2 3 4 5 6 7 8 9 10 11
| public class AuthHandshakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(..., Map<String,Object> attrs) { String token = extractToken(request); String userId = redisTemplate.opsForValue().get("STOKENS_" + token); if (userId == null) return false; attrs.put("userId", userId); return true; } }
|
5.Kafka 同步与下游推送
userCenterSynUserToTsp:从 STOKENS_{userId} Hash 读所有 Token,同步到 TSP 系统
logoutDisableUser:调用登出接口,清理 Hash 中对应 field 后,通知各节点下线
四、效果展示
改造前:

改造后

优化前后耗时对比
阶段 |
耗时 |
优化前 |
1.47 s |
优化后 |
0.281 s (281 ms) |
从表中可以明显看出,耗时由 1.47 s 降到了 281 ms,速度提升了 ≈ 520%