You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于Akka的WebSocket端点浏览器端授权的惯用实现方法及令牌过期处理咨询

这确实是基于Akka构建WebSocket服务时非常常见的授权场景,毕竟浏览器端的WebSocket API确实没法在握手阶段自定义授权头,只能在连接建立后发送令牌。我来分享几个惯用的实现思路,不用写太多冗余代码就能搞定初始授权和令牌过期刷新的需求。

一、初始授权:连接后先验证令牌再放行业务流量

Akka Streams提供的prefixAndTail操作符刚好能满足这个需求——它可以先捕获客户端发送的第一条消息(也就是授权令牌),验证通过后再把后续的消息接入真正的业务处理Flow;如果验证失败或者超时未收到令牌,直接终止连接即可。

这里有个简洁的实现示例:

import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.TextMessage

def createAuthorizedWebSocketFlow(authService: AuthService): Flow[Message, Message, _] = {
  Flow[Message].prefixAndTail(1).flatMapConcat { case (firstMsgBatch, restOfMessages) =>
    firstMsgBatch.headOption match {
      // 处理客户端发来的第一条令牌消息
      case Some(TextMessage.Strict(token)) =>
        authService.validateToken(token).map { isValid =>
          if (isValid) {
            // 授权成功:先返回成功通知,再把后续消息接入业务Flow
            val businessFlow = createBusinessProcessingFlow(token)
            Source.single(TextMessage.Strict("AUTH_SUCCESS")) ++ restOfMessages.via(businessFlow)
          } else {
            // 授权失败:返回错误后终止连接
            Source.single(TextMessage.Strict("AUTH_FAILED")).concat(Source.empty)
          }
        }
      // 第一条消息不是有效格式,直接终止
      case _ =>
        Source.single(TextMessage.Strict("INVALID_AUTH_FORMAT")).concat(Source.empty)
    }
  }
}

// 你的业务处理Flow,这里只是示例
def createBusinessProcessingFlow(validToken: String): Flow[Message, Message, _] = {
  Flow[Message].map {
    case TextMessage.Strict(content) => TextMessage.Strict(s"Processed: $content (token: ${validToken.take(8)}...)")
    case _ => TextMessage.Strict("Unsupported message type")
  }
}

如果需要处理客户端发送的是流式消息(不是Strict的),可以先把消息转为Strict再处理,避免遗漏内容。

二、令牌过期处理:支持刷新并暂停/恢复业务流程

对于令牌过期的情况,我们可以在业务Flow里加入状态管理,跟踪当前令牌的有效性:当令牌即将过期时,通知客户端需要刷新;在等待刷新期间,拒绝所有业务请求,直到收到有效的新令牌;如果超时未收到,直接终止连接。

这里推荐用statefulMapConcat来实现状态流转(未授权/已授权/等待刷新),结合Akka的定时器来处理超时:

import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.TextMessage
import akka.actor.Scheduler
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

def createTokenRefreshableFlow(authService: AuthService, tokenExpiry: FiniteDuration, refreshTimeout: FiniteDuration)(implicit scheduler: Scheduler, ec: ExecutionContext): Flow[Message, Message, _] = {
  Flow[Message].statefulMapConcat { () =>
    var currentToken: Option[String] = None
    var isWaitingForRefresh = false
    var refreshTimeoutTask: Option[Cancellable] = None

    msg =>
      msg match {
        // 处理初始授权或刷新令牌的消息
        case TextMessage.Strict(rawMsg) if rawMsg.startsWith("REFRESH_TOKEN:") || currentToken.isEmpty =>
          val token = if (currentToken.isEmpty) rawMsg else rawMsg.replace("REFRESH_TOKEN:", "")
          authService.validateToken(token).map { isValid =>
            if (isValid) {
              currentToken = Some(token)
              isWaitingForRefresh = false
              refreshTimeoutTask.foreach(_.cancel())
              // 调度下一次令牌过期提醒
              refreshTimeoutTask = Some(scheduler.scheduleOnce(tokenExpiry) {
                isWaitingForRefresh = true
              })
              List(TextMessage.Strict(if (currentToken.isEmpty) "AUTH_SUCCESS" else "REFRESH_SUCCESS"))
            } else {
              // 令牌无效,终止连接
              List(TextMessage.Strict(if (currentToken.isEmpty) "AUTH_FAILED" else "REFRESH_FAILED")) ++ List.empty[Message]
            }
          }.getOrElse(List(TextMessage.Strict("AUTH_ERROR")))
        
        // 等待刷新期间收到业务消息,拒绝
        case _ if isWaitingForRefresh =>
          List(TextMessage.Strict("ERROR: TOKEN_EXPIRED, PLEASE SEND REFRESH TOKEN"))
        
        // 正常处理业务消息
        case businessMsg =>
          currentToken.map { token =>
            handleBusinessMessage(businessMsg, token)
          }.getOrElse(List(TextMessage.Strict("ERROR: UNAUTHORIZED")))
      }
  }.watchTermination() { (_, done) =>
    // 清理定时器资源
    done.onComplete(_ => refreshTimeoutTask.foreach(_.cancel()))
  }
}

// 业务消息处理逻辑示例
def handleBusinessMessage(msg: Message, token: String): List[Message] = {
  msg match {
    case TextMessage.Strict(content) => List(TextMessage.Strict(s"Processed request: $content (valid token)"))
    case _ => List(TextMessage.Strict("Unsupported message type"))
  }
}

如果需要更复杂的状态流转,也可以自定义GraphStage来实现,这样能更精细地控制流的暂停、恢复和终止逻辑,适合对性能或状态控制要求更高的场景。

额外提示

  • 一定要处理超时:不管是初始授权超时,还是刷新令牌的超时,都要及时终止空闲连接,避免资源浪费。
  • 客户端要配合实现:比如收到TOKEN_EXPIRED通知后,立即发送刷新后的令牌;如果收到AUTH_FAILEDREFRESH_FAILED,要关闭连接并提示用户重新登录。

内容的提问来源于stack exchange,提问作者Nebehr Gudahtt

火山引擎 最新活动