如何用Python实现Flask请求至多端点的转发调度?
Python实现Flask OCR服务的负载均衡转发方案
当然可以用Python实现这个需求!作为刚接触Web开发的数据科学家,用熟悉的Python工具搭建负载均衡转发服务完全没问题,不用急着去啃NGINX或HAProxy的配置。下面给你两种实用方案,分别对应轮询策略和基于端点空闲状态的动态转发,都是用Python生态里的工具就能搞定:
方案一:Flask + Requests 实现简单轮询负载均衡
这个方案最适合快速上手,核心思路是在4999端口的Flask服务里维护一个端点列表,每次收到请求就按顺序转发到下一个端点,循环往复。
代码示例
from flask import Flask, request, jsonify import requests app = Flask(__name__) # 配置要转发的三个OCR端点 ocr_endpoints = [ "http://0.0.0.0:5000/get_ocr_api", "http://0.0.0.0:5001/get_ocr", "http://0.0.0.0:5002/get_ocr_api" ] current_index = 0 # 轮询索引,记录当前要转发的端点位置 @app.route('/', defaults={'path': ''}) @app.route('/<path:path>', methods=['GET', 'POST']) def forward_request(path): global current_index # 获取当前要转发的目标端点 target_endpoint = ocr_endpoints[current_index] # 更新索引,实现轮询循环 current_index = (current_index + 1) % len(ocr_endpoints) try: # 根据请求方法转发请求(支持GET和POST,包括文件上传场景) if request.method == 'GET': response = requests.get(target_endpoint, params=request.args) elif request.method == 'POST': # 兼容JSON和文件上传的请求格式 if request.is_json: response = requests.post(target_endpoint, json=request.get_json()) else: response = requests.post(target_endpoint, files=request.files, data=request.form) # 将目标端点的响应原封不动返回给客户端 return jsonify(response.json()), response.status_code except requests.exceptions.RequestException as e: return jsonify({"error": f"转发请求失败: {str(e)}"}), 503 if __name__ == '__main__': app.run(host='0.0.0.0', port=4999, debug=False)
优缺点
- ✅ 优点:代码简单易懂,无需学习新工具,快速搭建测试环境
- ❌ 缺点:不检测端点是否可用,即使某个端点挂了仍会转发请求;无法感知端点负载情况
方案二:Aiohttp 实现异步+基于空闲状态的动态转发
如果需要更健壮的生产级方案,推荐用aiohttp(异步HTTP库)实现。这个方案会实时检测所有端点的健康状态和响应速度,自动选择当前最空闲(响应最快)的端点转发,容错性更好。
代码示例
from aiohttp import web, ClientSession import asyncio # 配置要转发的三个OCR端点 ocr_endpoints = [ "http://0.0.0.0:5000/get_ocr_api", "http://0.0.0.0:5001/get_ocr", "http://0.0.0.0:5002/get_ocr_api" ] async def check_endpoint_health(endpoint): """检测端点是否可用,返回响应时间(毫秒),不可用则返回None""" try: start_time = asyncio.get_event_loop().time() async with ClientSession() as session: # 用HEAD请求快速检测端点状态,避免占用过多资源 async with session.head(endpoint, timeout=2): response_time = (asyncio.get_event_loop().time() - start_time) * 1000 return response_time except Exception: return None async def forward_request(request): # 并行检测所有端点的健康状态,提升效率 health_tasks = [check_endpoint_health(endpoint) for endpoint in ocr_endpoints] health_results = await asyncio.gather(*health_tasks) # 筛选出可用的端点,并按响应时间排序(最快的在前) available_endpoints = [ (endpoint, rt) for endpoint, rt in zip(ocr_endpoints, health_results) if rt is not None ] if not available_endpoints: return web.json_response({"error": "所有OCR端点均不可用"}, status=503) # 选择响应最快的端点(即当前最空闲的) target_endpoint, _ = sorted(available_endpoints, key=lambda x: x[1])[0] # 转发请求到目标端点 async with ClientSession() as session: try: if request.method == 'GET': async with session.get(target_endpoint, params=request.query) as resp: data = await resp.json() return web.json_response(data, status=resp.status) elif request.method == 'POST': # 兼容JSON和文件上传请求 if request.content_type == 'application/json': body = await request.json() async with session.post(target_endpoint, json=body) as resp: data = await resp.json() return web.json_response(data, status=resp.status) else: # 处理文件上传的multipart请求 form_data = await request.multipart() async with session.post(target_endpoint, data=form_data) as resp: data = await resp.json() return web.json_response(data, status=resp.status) except Exception as e: return web.json_response({"error": f"转发请求失败: {str(e)}"}, status=503) app = web.Application() # 匹配所有路径的请求,统一转发 app.add_routes([web.route('*', '/{tail:.*}', forward_request)]) if __name__ == '__main__': web.run_app(app, host='0.0.0.0', port=4999)
优缺点
- ✅ 优点:异步性能高,能动态避开故障端点;自动选择最空闲的端点,提升整体服务效率
- ❌ 缺点:代码稍复杂,需要了解异步编程的基本概念,但对于数据科学家来说,异步处理也是很实用的技能
额外注意事项
- 日志记录:生产环境中建议添加日志模块,记录每次转发的端点、请求状态和响应时间,方便排查问题。
- 端点容错:如果某个端点频繁故障,可以考虑给它设置权重,减少转发次数,或者暂时从列表中移除。
- 性能优化:如果处理大文件上传,建议调整请求超时时间和内存限制,避免服务崩溃。
内容的提问来源于stack exchange,提问作者user2630711




