基于Akka的WebSocket端点浏览器端授权的惯用实现方法及令牌过期处理咨询
这确实是基于Akka构建WebSocket服务时非常常见的授权场景,毕竟浏览器端的WebSocket API确实没法在握手阶段自定义授权头,只能在连接建立后发送令牌。我来分享几个惯用的实现思路,不用写太多冗余代码就能搞定初始授权和令牌过期刷新的需求。
一、初始授权:连接后先验证令牌再放行业务流量
Akka Streams提供的prefixAndTail操作符刚好能满足这个需求——它可以先捕获客户端发送的第一条消息(也就是授权令牌),验证通过后再把后续的消息接入真正的业务处理Flow;如果验证失败或者超时未收到令牌,直接终止连接即可。
这里有个简洁的实现示例:
import akka.stream.scaladsl.Flow import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.model.ws.TextMessage def createAuthorizedWebSocketFlow(authService: AuthService): Flow[Message, Message, _] = { Flow[Message].prefixAndTail(1).flatMapConcat { case (firstMsgBatch, restOfMessages) => firstMsgBatch.headOption match { // 处理客户端发来的第一条令牌消息 case Some(TextMessage.Strict(token)) => authService.validateToken(token).map { isValid => if (isValid) { // 授权成功:先返回成功通知,再把后续消息接入业务Flow val businessFlow = createBusinessProcessingFlow(token) Source.single(TextMessage.Strict("AUTH_SUCCESS")) ++ restOfMessages.via(businessFlow) } else { // 授权失败:返回错误后终止连接 Source.single(TextMessage.Strict("AUTH_FAILED")).concat(Source.empty) } } // 第一条消息不是有效格式,直接终止 case _ => Source.single(TextMessage.Strict("INVALID_AUTH_FORMAT")).concat(Source.empty) } } } // 你的业务处理Flow,这里只是示例 def createBusinessProcessingFlow(validToken: String): Flow[Message, Message, _] = { Flow[Message].map { case TextMessage.Strict(content) => TextMessage.Strict(s"Processed: $content (token: ${validToken.take(8)}...)") case _ => TextMessage.Strict("Unsupported message type") } }
如果需要处理客户端发送的是流式消息(不是Strict的),可以先把消息转为Strict再处理,避免遗漏内容。
二、令牌过期处理:支持刷新并暂停/恢复业务流程
对于令牌过期的情况,我们可以在业务Flow里加入状态管理,跟踪当前令牌的有效性:当令牌即将过期时,通知客户端需要刷新;在等待刷新期间,拒绝所有业务请求,直到收到有效的新令牌;如果超时未收到,直接终止连接。
这里推荐用statefulMapConcat来实现状态流转(未授权/已授权/等待刷新),结合Akka的定时器来处理超时:
import akka.stream.scaladsl.Flow import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.model.ws.TextMessage import akka.actor.Scheduler import scala.concurrent.duration._ import scala.concurrent.ExecutionContext def createTokenRefreshableFlow(authService: AuthService, tokenExpiry: FiniteDuration, refreshTimeout: FiniteDuration)(implicit scheduler: Scheduler, ec: ExecutionContext): Flow[Message, Message, _] = { Flow[Message].statefulMapConcat { () => var currentToken: Option[String] = None var isWaitingForRefresh = false var refreshTimeoutTask: Option[Cancellable] = None msg => msg match { // 处理初始授权或刷新令牌的消息 case TextMessage.Strict(rawMsg) if rawMsg.startsWith("REFRESH_TOKEN:") || currentToken.isEmpty => val token = if (currentToken.isEmpty) rawMsg else rawMsg.replace("REFRESH_TOKEN:", "") authService.validateToken(token).map { isValid => if (isValid) { currentToken = Some(token) isWaitingForRefresh = false refreshTimeoutTask.foreach(_.cancel()) // 调度下一次令牌过期提醒 refreshTimeoutTask = Some(scheduler.scheduleOnce(tokenExpiry) { isWaitingForRefresh = true }) List(TextMessage.Strict(if (currentToken.isEmpty) "AUTH_SUCCESS" else "REFRESH_SUCCESS")) } else { // 令牌无效,终止连接 List(TextMessage.Strict(if (currentToken.isEmpty) "AUTH_FAILED" else "REFRESH_FAILED")) ++ List.empty[Message] } }.getOrElse(List(TextMessage.Strict("AUTH_ERROR"))) // 等待刷新期间收到业务消息,拒绝 case _ if isWaitingForRefresh => List(TextMessage.Strict("ERROR: TOKEN_EXPIRED, PLEASE SEND REFRESH TOKEN")) // 正常处理业务消息 case businessMsg => currentToken.map { token => handleBusinessMessage(businessMsg, token) }.getOrElse(List(TextMessage.Strict("ERROR: UNAUTHORIZED"))) } }.watchTermination() { (_, done) => // 清理定时器资源 done.onComplete(_ => refreshTimeoutTask.foreach(_.cancel())) } } // 业务消息处理逻辑示例 def handleBusinessMessage(msg: Message, token: String): List[Message] = { msg match { case TextMessage.Strict(content) => List(TextMessage.Strict(s"Processed request: $content (valid token)")) case _ => List(TextMessage.Strict("Unsupported message type")) } }
如果需要更复杂的状态流转,也可以自定义GraphStage来实现,这样能更精细地控制流的暂停、恢复和终止逻辑,适合对性能或状态控制要求更高的场景。
额外提示
- 一定要处理超时:不管是初始授权超时,还是刷新令牌的超时,都要及时终止空闲连接,避免资源浪费。
- 客户端要配合实现:比如收到
TOKEN_EXPIRED通知后,立即发送刷新后的令牌;如果收到AUTH_FAILED或REFRESH_FAILED,要关闭连接并提示用户重新登录。
内容的提问来源于stack exchange,提问作者Nebehr Gudahtt




