You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Flask中实现WebSocket请求到本地Unix套接字的双向流量转发

如何在Flask中实现WebSocket请求到本地Unix套接字的双向流量转发

我完全懂你要的是什么——不需要自己搞WebSocket服务端的业务逻辑,就想当个“透明桥梁”,把用户浏览器的WebSocket请求直接连到已经存在的Unix套接字上,让两边的流量自己跑,认证、权限检查这些前置工作还得复用现有Flask的代码对吧?刚好我之前做过几乎一模一样的需求,给你捋个完全贴合现有栈的方案:


核心思路

我们要做的是WebSocket代理桥接:用Flask-Sockets帮我们处理WebSocket的握手流程,拿到客户端的WebSocket连接对象后,直接和目标Unix套接字建立连接,然后用协程实现双向流量转发——Flask只负责前置校验,之后就彻底“隐身”,不碰任何业务数据。


具体实现步骤

1. 先装必要依赖

这些依赖都是轻量的,不会给现有栈加负担:

pip install flask-sockets gevent gevent-websocket
  • flask-sockets:帮我们快速在Flask里注册WebSocket路由,自动处理WebSocket握手(不用自己写复杂的协议逻辑)
  • gevent + gevent-websocket:用协程处理并发转发,比线程高效N倍,同时让Flask原生支持WebSocket

2. 集成到现有Flask应用

直接把下面的逻辑插你现有Flask代码里,复用你已有的认证、权限检查逻辑就行:

from flask import Flask, request, abort
from flask_sockets import Sockets
import socket
from gevent import spawn, wait
import os

# 直接用你现有的Flask app实例!不用新建
app = Flask(__name__)
sockets = Sockets(app)

# ------------------------------
# ↓↓↓ 这里直接复用你已有的业务逻辑 ↓↓↓
# ------------------------------
def verify_login_token(token):
    # 你的现有token验证逻辑:比如从DB查token对应的用户
    user = ...  # 替换成你的实际代码
    return user

def check_machine_permission(user, machine_name):
    # 你的现有权限检查逻辑:用户是否能访问该机器
    return True  # 替换成你的实际代码

def is_machine_connected(machine_name):
    # 检查机器是否在线(可以结合DB状态+套接字文件存在性)
    sock_path = f'/remote/vnc_websocket/{machine_name}/websocket.sock'
    return os.path.exists(sock_path)
# ------------------------------

def forward_ws_to_unix(ws, unix_sock):
    """把客户端WebSocket的流量转发到Unix套接字"""
    try:
        while True:
            # 接收客户端的WebSocket数据(VNC是二进制,拿到的是bytes)
            data = ws.receive()
            if not data:
                break  # 客户端断开连接
            # 兜底处理:防止偶尔收到文本格式的垃圾数据
            if isinstance(data, str):
                data = data.encode('utf-8')
            unix_sock.sendall(data)
    except Exception as e:
        app.logger.error(f"WS → Unix 转发失败: {str(e)}")
    finally:
        # 任一连接断开,立即关闭两边,避免资源泄漏
        ws.close()
        unix_sock.close()

def forward_unix_to_ws(unix_sock, ws):
    """把Unix套接字的流量转发到客户端WebSocket"""
    try:
        while True:
            # 从Unix套接字读二进制数据(VNC流量是纯二进制)
            data = unix_sock.recv(4096)
            if not data:
                break  # 远程机器断开连接
            # 必须用WebSocket的binary模式发送,否则VNC画面会乱码
            ws.send(data, binary=True)
    except Exception as e:
        app.logger.error(f"Unix → WS 转发失败: {str(e)}")
    finally:
        unix_sock.close()
        ws.close()

@sockets.route('/vnc/<machine_name>')
def vnc_proxy(ws, machine_name):
    # ------------------------------
    # 步骤1-4:复用你已有的校验逻辑
    # ------------------------------
    # 从请求头拿登录token(和你现有HTTP接口的逻辑完全一致)
    auth_header = request.headers.get('Authorization')
    if not auth_header or not auth_header.startswith('Bearer '):
        abort(401, description="缺少有效登录凭证")
    token = auth_header.split(' ')[1]
    
    # 验证token有效性,拿到用户信息
    user = verify_login_token(token)
    if not user:
        abort(401, description="登录凭证无效")
    
    # 检查机器状态和用户权限
    if not is_machine_connected(machine_name):
        abort(503, description="目标机器未在线")
    if not check_machine_permission(user, machine_name):
        abort(403, description="无权限访问该机器")
    
    # ------------------------------
    # 步骤5:核心桥接逻辑
    # ------------------------------
    unix_sock_path = f'/remote/vnc_websocket/{machine_name}/websocket.sock'
    # 建立到Unix套接字的连接
    unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    try:
        unix_sock.connect(unix_sock_path)
    except Exception as e:
        app.logger.error(f"无法连接到Unix套接字 {unix_sock_path}: {str(e)}")
        abort(503, description="无法连接到目标机器")
    
    # 用gevent协程处理双向转发(比线程高效,适合高并发)
    tasks = [
        spawn(forward_ws_to_unix, ws, unix_sock),
        spawn(forward_unix_to_ws, unix_sock, ws)
    ]
    # 等待两个协程完成(任一连接断开,协程就会自动退出)
    wait(tasks)

# ------------------------------
# 你的其他现有Flask路由完全不用动!
# ------------------------------
@app.route('/')
def index():
    return "你的现有首页"

if __name__ == '__main__':
    # 本地测试用的服务器(生产环境用gunicorn)
    from gevent.pywsgi import WSGIServer
    from geventwebsocket.handler import WebSocketHandler
    server = WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler)
    print("服务器启动在 http://0.0.0.0:5000")
    server.serve_forever()

3. 生产部署调整

如果你的Flask应用之前是用gunicorn运行的,只需要把worker改成支持WebSocket的就行,完全兼容你现有的HTTP接口:

# 替换成你的实际app模块名
gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 4 your_app_module:app

为什么这个方案适合你?

  1. 100%复用现有栈:认证、权限检查、机器状态校验全用你已有的代码,不需要写独立服务,也不用重复造轮子
  2. 轻量高效:用gevent协程处理转发,比线程开销小一个数量级,能支持更多并发连接
  3. 完全透明转发:Flask只做前置校验,之后彻底不碰业务数据,完美符合你“get out of the way”的需求
  4. 异常处理完善:任一连接断开时都会自动关闭两边的连接,避免僵尸连接和资源泄漏

注意事项

  • 权限问题:确保运行Flask的进程对/remote/vnc_websocket/路径下的套接字文件有读写权限(可以把Flask进程加入对应组,或者调整套接字权限为775
  • 二进制处理:VNC流量是纯二进制的,代码里已经强制用WebSocket的binary模式转发,别改成文本模式,否则画面会乱码
  • 日志监控:可以在转发函数里加更多日志,方便排查连接中断、转发失败的问题
  • 并发优化:如果并发连接数特别多,可以调大gunicorn的worker数量,或者用Uvicorn作为服务器(需要稍微调整代码适配ASGI,但逻辑不变)

如果还有细节问题,比如特定异常场景的处理、并发性能优化,随时再问!

火山引擎 最新活动