分布式系统登录优化实战

一、项目背景

在对现有TSP系统的Redis进行性能优化时,发现当前分布式系统中,用户登录态通过 Redis 来管理:以键值对形式存储用户的 Token 与用户信息(UserDto),并结合 WebSocket、SSE、Kafka 组件,实现实时通信、消息推送及统一认证(SSO)。

随着业务的发展,发现原有的存储设计存在以下问题:

  1. Key 命名零散、扫描效率低
  2. 多处基于通配符 (KEYS STOKENS_{userId}_\*) 的扫描逻辑冗余、性能不佳
  3. WebSocket 端无根据 Token 快速反查用户 ID的逻辑,通过scans进行全量查询
  4. 接口逻辑侵入了大量“查旧 Key、删旧 Key”的代码,且可维护性差
  5. 随着redis数据量的增多,执行scans需要耗费的时间逐步递增,使用体验差

image-20250801142924064

二、改造思路

将 Redis 存储从多 string Key改造为Hash 结构,并在登录、登出、校验、WebSocket 握手、SSE 推送、Kafka 同步等流程中,做出对应改造,彻底简化业务逻辑、提升性能与可维护性

1.Redis Key 结构升级
  • StringHash,方便聚合和局部读写
  • 设计双向映射
    • HSET STOKENS_{userId} {token} UserDto
    • SET STOKENS_{token} {userId}
2.SESSION 管理
  • 登录、登出及 WebSocket 握手,都写入/删除 SESSION_{userId} 中对应 field
3.简化“重复登录”检查逻辑
  • 只需 HKEYS STOKENS_{userId} 得到当前所有 Token
  • 新旧 Token 比对,决定是否踢出、确认或拒绝登录
4.最小化改动范围
  • 在原有的 saveObjectToHash()方法,(保存 STOKENS_{userId})逻辑后,写入 SET STOKENS_{token}
  • 所有涉及登录态的接口(/login/logout/validateToken/user/resetPassWord/user/editUser、SSE 推送、Kafka 同步等)逐一替换原有的 String 操作为 Hash 操作

三、关键部分改造示例

1.登录接口防重复登录校验

改造前:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 判断是否有其他用户登陆,并且是否确认踢掉其他用户
*
* @param dto
* @param confirmLoginFlg
* @param result
* @throws JsonProcessingException
*/
private boolean isSaveTokenToredis(UserInfoDto dto, String confirmLoginFlg, HttpCommandResultWithData result)
throws JsonProcessingException {
// 生成token
String token = StringUtil.getUUID();
dto.setToken(token);
// redis token key:STOKENS_+UserId+"_"+token
String keys = Constants.TOKEN_KEY + dto.getUserId() + Constants.SYMBO_UNDERLINE;
String sessionKeys = Constants.SESSION_KEY + dto.getUserId() + Constants.SYMBO_UNDERLINE;
// 校验是否确认登陆,并踢掉其他用户,如果返回false返回前端确认登陆提示,否则踢掉之前用户并登陆,并清掉redis
if (!checkConfirmLogin(keys + Constants.SYMBO_NUMBERSIGN,
sessionKeys + Constants.SYMBO_NUMBERSIGN, confirmLoginFlg, result)) {
return false;
}
// 存入redis
redisService.saveObjectToJson(keys + token, dto, tokenLiveTime);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private boolean checkConfirmLogin(String keys, String sessionKeys, String confirmLoginFlg, HttpCommandResultWithData result) {
boolean isConfirmLogin = true;
// 遍历redis记录
Set<String> tb_set = redisService.getKeys(keys);
Set<String> session_set = redisService.getKeys(sessionKeys);
// 判空
if (!StringUtil.isEmpty(session_set)) {
if (!StringUtil.isEmpty(tb_set)) {
for (String key : tb_set) {
// 已经登陆的用户信息
UserInfoDto dto = redisService.getJson(key, UserInfoDto.class);
LogOutCommand command = new LogOutCommand();
// 如果已经有用户
if (dto != null) {
// 其他用户token
String token = dto.getToken();
// 设置登出参数
command.setToken(token);
// 如果确认登陆踢掉之前的用户,并发送消息推送并登陆
if (StringUtil.isEq(Constants.ConfirmLoginFlgEnum.CONFIRM_LOGIN.getCode(), confirmLoginFlg)) {
// 登出其他用户
this.logout(command);
// 消息推送给其他登陆的用户,踢掉其他登陆的用户
this.pushMessage(token);
isConfirmLogin = true;
continue;
}
// 否则返回失败并设置提示用户已在其他地方登陆的提示信息,提示用户是否确认登陆
else {
// 返回前端是否确认登陆提示
result.fillResult(ReturnCode.CONFIRM_LOGIN);
isConfirmLogin = false;
break;
}
} else {
this.logout(command);
isConfirmLogin = true;
}
}
}
}
return isConfirmLogin;
}

改造后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 判断是否有其他用户登陆,并且是否确认踢掉其他用户
*
* @param dto
* @param confirmLoginFlg
* @param result
* @throws JsonProcessingException
*/
private boolean isSaveTokenToredis(UserInfoDto dto, String confirmLoginFlg, HttpCommandResultWithData result)
throws JsonProcessingException {
String keys = Constants.TOKEN_KEY;
String sessionKeys = Constants.SESSION_KEY;
// 校验是否确认登陆,并踢掉其他用户,如果返回false返回前端确认登陆提示,否则踢掉之前用户并登陆,并清掉redis
// STOKENS_UserId, SESSION_UserId
if (!checkConfirmLogin(keys + dto.getUserId(), sessionKeys + dto.getUserId(), confirmLoginFlg, result)) {
return false;
}
// 生成token
String token = StringUtil.getUUID();
dto.setToken(token);
// 存入redis
// key1:STOKENS_UserId key2:token value:UserInfoDto
redisServiceNew.saveObjectToHash(keys + dto.getUserId(), token, dto, tokenLiveTime);
// key:STOKENS_token value:UserId
redisService.saveObjectToJson(keys + token, dto.getUserId());
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 如果有其他用户登陆时校验是否确认登陆
*
* @param keys
* @param confirmLoginFlg
* @param result
* @return
*/
private boolean checkConfirmLogin(String keys, String sessionKeys, String confirmLoginFlg, HttpCommandResultWithData result) {
// 遍历redis记录
String session = redisService.getValueByKey(sessionKeys);
if (session == null) {
return true;
}
// 取出已有会话对应的用户信息
UserInfoDto dto = redisService.getJson(keys, session, UserInfoDto.class);
if (dto == null) {
return true;
}
// 如果前端确认要踢掉旧会话
if (Constants.ConfirmLoginFlgEnum.CONFIRM_LOGIN.getCode().equals(confirmLoginFlg)) {
LogOutCommand cmd = new LogOutCommand();
cmd.setToken(dto.getToken());
cmd.setUserInfor(dto);
this.logout(cmd);
this.pushMessage(dto.getToken());
return true;
}
// 返回前端是否确认登陆提示
result.fillResult(ReturnCode.CONFIRM_LOGIN);
return false;
}
2.WebSocket逻辑改造

改造前:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@ServerEndpoint(value = "/push/pushSocket")
@Component
@Scope("prototype")
public class PushSocket {
IPushWatchedService pushWatchedService;


private static Log logger = LogFactory.getLog(PushSocket.class);

public PushSocket() {
this.pushWatchedService =
(IPushWatchedService) Application.getApplicationContext().getBean("pushWatchedService");
}

@OnOpen
public void open(Session session) {
try {
String token = session.getQueryString();
if (StringUtil.isNotEmpty(token)) {
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
if (StringUtil.isNotEmpty(token)) {
WebSocketContainer.register(token, session);
// 使用观察者模式进行服务注册.
pushWatchedService.addPushSocket(session, this);
String userID = pushWatchedService.findUserIDByTokenKey(token);
logger.info("redis保存与session绑定的key:" + Constants.SESSION_KEY + userID +Constants.SYMBO_UNDERLINE + token);
pushWatchedService.addSessionKey(token, userID);
logger.debug("与客户端建立连接成功,连接token is : " + token);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

/**
* 接收到客户端的消息
*
* @param session
* @param text
*/
@OnMessage
public void onMessage(Session session, String text) {
try {
session.getBasicRemote().sendText(text);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

/**
* 向客户端发送消息
*
* @param session
* @param text
*/
public void sendMessage(Session session, String text) {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];

logger.debug("向客户端发送消息,连接token is : " + token + "发送的消息为:" + text);
try {
session.getBasicRemote().sendText(text);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

@OnClose
public void close(Session session) {
try {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("关闭的连接token is : " + token);

//清空缓存
String userID = pushWatchedService.findUserIDByTokenKey(token);
logger.info("redis删除key:" + Constants.SESSION_KEY + userID + Constants.SYMBO_UNDERLINE + token);

pushWatchedService.findAndDelKey(token, userID);
// 清空session
WebSocketContainer.destory(session);

pushWatchedService.removePushSocket(session, this);

} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

@OnError
public void onError(Throwable e, Session session) {
try {
if (session != null) {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("错误时关闭的连接token is : " + token);

session.close();

//清空缓存
String userID = pushWatchedService.findUserIDByTokenKey(token);
logger.info("redis删除key:" + Constants.SESSION_KEY + userID + Constants.SYMBO_UNDERLINE + token);
pushWatchedService.findAndDelKey(token, userID);
// 清空session
WebSocketContainer.destory(session);
pushWatchedService.removePushSocket(session, this);
}
} catch (Exception e1) {
logger.error(e.getMessage(), e);
}
}
}

改造后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@ServerEndpoint(value = "/push/pushSocket")
@Component
@Scope("prototype")
public class PushSocket {
IPushWatchedService pushWatchedService;


private static Log logger = LogFactory.getLog(PushSocket.class);

public PushSocket() {
this.pushWatchedService =
(IPushWatchedService) Application.getApplicationContext().getBean("pushWatchedService");
}

@OnOpen
public void open(Session session) {
try {
String token = session.getQueryString();
if (StringUtil.isNotEmpty(token)) {
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
if (StringUtil.isNotEmpty(token)) {
WebSocketContainer.register(token, session);
// 使用观察者模式进行服务注册.
pushWatchedService.addPushSocket(session, this);
String userID = pushWatchedService.findUserIDByTokenKey(token);
pushWatchedService.addSessionKey(token, userID);
logger.debug("与客户端建立连接成功,连接token is : " + token);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

/**
* 接收到客户端的消息
*
* @param session
* @param text
*/
@OnMessage
public void onMessage(Session session, String text) {
try {
session.getBasicRemote().sendText(text);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

/**
* 向客户端发送消息
*
* @param session
* @param text
*/
public void sendMessage(Session session, String text) {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];

logger.debug("向客户端发送消息,连接token is : " + token + "发送的消息为:" + text);
try {
session.getBasicRemote().sendText(text);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

@OnClose
public void close(Session session) {
try {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("关闭的连接token is : " + token);

//清空缓存
String userID = pushWatchedService.findUserIDByTokenKey(token);
pushWatchedService.findAndDelKey(Constants.SESSION_KEY + userID);
pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + token);
pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + userID, token);
logger.info("redis删除key:" + Constants.SESSION_KEY + userID);
logger.info("redis删除key:" + Constants.TOKEN_KEY + token);
logger.info("redis删除key:" + Constants.TOKEN_KEY + userID+ "-" + token);
// 清空session
WebSocketContainer.destory(session);

pushWatchedService.removePushSocket(session, this);

} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

@OnError
public void onError(Throwable e, Session session) {
try {
if (session != null) {
String token = session.getQueryString();
token = token.indexOf("=") == -1 ? "" : token.split("=")[1];
logger.debug("错误时关闭的连接token is : " + token);

session.close();

//清空缓存
String userID = pushWatchedService.findUserIDByTokenKey(token);
pushWatchedService.findAndDelKey(Constants.SESSION_KEY + userID);
pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + token);
pushWatchedService.findAndDelKey(Constants.TOKEN_KEY + userID, token);
logger.info("redis删除key:" + Constants.SESSION_KEY + userID);
logger.info("redis删除key:" + Constants.TOKEN_KEY + token);
logger.info("redis删除key:" + Constants.TOKEN_KEY + userID+ "-" + token);
// 清空session
WebSocketContainer.destory(session);
pushWatchedService.removePushSocket(session, this);
}
} catch (Exception e1) {
logger.error(e.getMessage(), e);
}
}
}
3.Redis存入结构改造

改造前:

image-20250801144900190

改造后:

image-20250801144914581

4.握手拦截器获取 UserId
1
2
3
4
5
6
7
8
9
10
11
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(..., Map<String,Object> attrs) {
String token = extractToken(request);
String userId = redisTemplate.opsForValue().get("STOKENS_" + token);
if (userId == null) return false;
attrs.put("userId", userId);
return true;
}
// ...
}
5.Kafka 同步与下游推送

userCenterSynUserToTsp:从 STOKENS_{userId} Hash 读所有 Token,同步到 TSP 系统

logoutDisableUser:调用登出接口,清理 Hash 中对应 field 后,通知各节点下线

四、效果展示

改造前:

image-20250801160957920

改造后

image-20250801161044867

优化前后耗时对比
阶段 耗时
优化前 1.47 s
优化后 0.281 s (281 ms)

从表中可以明显看出,耗时由 1.47 s 降到了 281 ms,速度提升了 ≈ 520%