You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Python实现Flask请求至多端点的转发调度?

Python实现Flask OCR服务的负载均衡转发方案

当然可以用Python实现这个需求!作为刚接触Web开发的数据科学家,用熟悉的Python工具搭建负载均衡转发服务完全没问题,不用急着去啃NGINX或HAProxy的配置。下面给你两种实用方案,分别对应轮询策略基于端点空闲状态的动态转发,都是用Python生态里的工具就能搞定:

方案一:Flask + Requests 实现简单轮询负载均衡

这个方案最适合快速上手,核心思路是在4999端口的Flask服务里维护一个端点列表,每次收到请求就按顺序转发到下一个端点,循环往复。

代码示例

from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

# 配置要转发的三个OCR端点
ocr_endpoints = [
    "http://0.0.0.0:5000/get_ocr_api",
    "http://0.0.0.0:5001/get_ocr",
    "http://0.0.0.0:5002/get_ocr_api"
]
current_index = 0  # 轮询索引,记录当前要转发的端点位置

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST'])
def forward_request(path):
    global current_index
    # 获取当前要转发的目标端点
    target_endpoint = ocr_endpoints[current_index]
    # 更新索引,实现轮询循环
    current_index = (current_index + 1) % len(ocr_endpoints)
    
    try:
        # 根据请求方法转发请求(支持GET和POST,包括文件上传场景)
        if request.method == 'GET':
            response = requests.get(target_endpoint, params=request.args)
        elif request.method == 'POST':
            # 兼容JSON和文件上传的请求格式
            if request.is_json:
                response = requests.post(target_endpoint, json=request.get_json())
            else:
                response = requests.post(target_endpoint, files=request.files, data=request.form)
        
        # 将目标端点的响应原封不动返回给客户端
        return jsonify(response.json()), response.status_code
    except requests.exceptions.RequestException as e:
        return jsonify({"error": f"转发请求失败: {str(e)}"}), 503

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=4999, debug=False)

优缺点

  • ✅ 优点:代码简单易懂,无需学习新工具,快速搭建测试环境
  • ❌ 缺点:不检测端点是否可用,即使某个端点挂了仍会转发请求;无法感知端点负载情况

方案二:Aiohttp 实现异步+基于空闲状态的动态转发

如果需要更健壮的生产级方案,推荐用aiohttp(异步HTTP库)实现。这个方案会实时检测所有端点的健康状态和响应速度,自动选择当前最空闲(响应最快)的端点转发,容错性更好。

代码示例

from aiohttp import web, ClientSession
import asyncio

# 配置要转发的三个OCR端点
ocr_endpoints = [
    "http://0.0.0.0:5000/get_ocr_api",
    "http://0.0.0.0:5001/get_ocr",
    "http://0.0.0.0:5002/get_ocr_api"
]

async def check_endpoint_health(endpoint):
    """检测端点是否可用,返回响应时间(毫秒),不可用则返回None"""
    try:
        start_time = asyncio.get_event_loop().time()
        async with ClientSession() as session:
            # 用HEAD请求快速检测端点状态,避免占用过多资源
            async with session.head(endpoint, timeout=2):
                response_time = (asyncio.get_event_loop().time() - start_time) * 1000
                return response_time
    except Exception:
        return None

async def forward_request(request):
    # 并行检测所有端点的健康状态,提升效率
    health_tasks = [check_endpoint_health(endpoint) for endpoint in ocr_endpoints]
    health_results = await asyncio.gather(*health_tasks)
    
    # 筛选出可用的端点,并按响应时间排序(最快的在前)
    available_endpoints = [
        (endpoint, rt) for endpoint, rt in zip(ocr_endpoints, health_results) if rt is not None
    ]
    if not available_endpoints:
        return web.json_response({"error": "所有OCR端点均不可用"}, status=503)
    
    # 选择响应最快的端点(即当前最空闲的)
    target_endpoint, _ = sorted(available_endpoints, key=lambda x: x[1])[0]
    
    # 转发请求到目标端点
    async with ClientSession() as session:
        try:
            if request.method == 'GET':
                async with session.get(target_endpoint, params=request.query) as resp:
                    data = await resp.json()
                    return web.json_response(data, status=resp.status)
            elif request.method == 'POST':
                # 兼容JSON和文件上传请求
                if request.content_type == 'application/json':
                    body = await request.json()
                    async with session.post(target_endpoint, json=body) as resp:
                        data = await resp.json()
                        return web.json_response(data, status=resp.status)
                else:
                    # 处理文件上传的multipart请求
                    form_data = await request.multipart()
                    async with session.post(target_endpoint, data=form_data) as resp:
                        data = await resp.json()
                        return web.json_response(data, status=resp.status)
        except Exception as e:
            return web.json_response({"error": f"转发请求失败: {str(e)}"}, status=503)

app = web.Application()
# 匹配所有路径的请求,统一转发
app.add_routes([web.route('*', '/{tail:.*}', forward_request)])

if __name__ == '__main__':
    web.run_app(app, host='0.0.0.0', port=4999)

优缺点

  • ✅ 优点:异步性能高,能动态避开故障端点;自动选择最空闲的端点,提升整体服务效率
  • ❌ 缺点:代码稍复杂,需要了解异步编程的基本概念,但对于数据科学家来说,异步处理也是很实用的技能

额外注意事项

  1. 日志记录:生产环境中建议添加日志模块,记录每次转发的端点、请求状态和响应时间,方便排查问题。
  2. 端点容错:如果某个端点频繁故障,可以考虑给它设置权重,减少转发次数,或者暂时从列表中移除。
  3. 性能优化:如果处理大文件上传,建议调整请求超时时间和内存限制,避免服务崩溃。

内容的提问来源于stack exchange,提问作者user2630711

火山引擎 最新活动