求TensorFlow多GPU推理Python示例:实习工作场景需求
TensorFlow多GPU推理的Python示例
嗨,刚好之前做推理服务时踩过类似的坑,给你分享几个实用的单机多GPU推理方案,都是Python实现的,很适合你实习的服务场景:
1. 使用tf.distribute.MirroredStrategy(官方推荐)
这是TensorFlow官方针对单机多GPU场景提供的分布式策略,能自动把推理任务分发到各个GPU上,代码改动很小,适合快速上手。
示例代码:
import tensorflow as tf import numpy as np # 初始化多GPU策略 strategy = tf.distribute.MirroredStrategy() # 在策略范围内加载模型 with strategy.scope(): # 替换成你的预训练模型/自定义模型即可 model = tf.keras.applications.ResNet50(weights='imagenet') # 准备测试数据(形状为[全局batch_size, 高度, 宽度, 通道数]) batch_size_per_gpu = 32 global_batch_size = batch_size_per_gpu * strategy.num_replicas_in_sync test_data = np.random.rand(global_batch_size, 224, 224, 3).astype(np.float32) # 定义分布式推理函数 @tf.function def distributed_inference(inputs): return strategy.run(lambda x: model(x), args=(inputs,)) # 执行推理 results = distributed_inference(test_data) # 合并各GPU的结果(转换为numpy数组) concat_results = tf.concat(results.values, axis=0).numpy() print(f"推理结果形状: {concat_results.shape}")
2. 手动分配GPU并行推理(细粒度控制)
如果需要更灵活地控制每个GPU的任务(比如把不同请求分配到不同GPU),可以手动指定GPU设备实现并行推理。
示例代码:
import tensorflow as tf import numpy as np from concurrent.futures import ThreadPoolExecutor # 定义单GPU推理函数 def inference_on_gpu(gpu_id, model, data): with tf.device(f'/GPU:{gpu_id}'): return model(data) # 加载模型(手动管理设备时无需strategy.scope) model = tf.keras.applications.ResNet50(weights='imagenet') model.trainable = False # 推理模式下固定权重,提升性能 # 准备数据并拆分到各个GPU num_gpus = len(tf.config.list_physical_devices('GPU')) total_batch_size = 32 * num_gpus test_data = np.random.rand(total_batch_size, 224, 224, 3).astype(np.float32) data_splits = np.array_split(test_data, num_gpus) # 用线程池实现多GPU并行推理 with ThreadPoolExecutor(max_workers=num_gpus) as executor: futures = [ executor.submit(inference_on_gpu, gpu_id, model, split_data) for gpu_id, split_data in enumerate(data_splits) ] # 收集所有GPU的推理结果 results = [future.result() for future in futures] # 合并结果 concat_results = np.concatenate(results, axis=0) print(f"推理结果形状: {concat_results.shape}")
3. 集成到推理服务(以FastAPI为例)
如果要把多GPU推理做成可调用的服务,可以结合FastAPI这类轻量框架,下面是一个简单的示例:
from fastapi import FastAPI import tensorflow as tf import numpy as np from PIL import Image import io app = FastAPI() # 初始化多GPU策略并加载模型 strategy = tf.distribute.MirroredStrategy() with strategy.scope(): model = tf.keras.applications.ResNet50(weights='imagenet') preprocess_input = tf.keras.applications.resnet50.preprocess_input decode_predictions = tf.keras.applications.resnet50.decode_predictions @app.post("/predict") async def predict(image: bytes): # 预处理上传的图片 img = Image.open(io.BytesIO(image)).resize((224, 224)) x = np.array(img)[np.newaxis, ...] x = preprocess_input(x) # 多GPU推理 @tf.function def distributed_predict(inputs): return strategy.run(lambda x: model(x), args=(inputs,)) preds = distributed_predict(x) preds = tf.concat(preds.values, axis=0).numpy() # 解析并返回结果 decoded = decode_predictions(preds, top=3)[0] return { "predictions": [{"class": cls, "probability": float(prob)} for (_, cls, prob) in decoded] }
实用注意事项
- 推理前记得设置
model.trainable = False,关闭梯度计算,能大幅提升推理速度 - 自定义模型建议保存为SavedModel格式,这样在分布式策略下加载不会出现兼容性问题
MirroredStrategy适合高吞吐量的批量推理场景,自动拆分数据、合并结果;手动分配GPU的方式更适合处理小批量或异步请求- 如果GPU显存不足,可以调小单GPU的batch size,或者启用TensorFlow的显存动态分配:
tf.config.experimental.set_memory_growth(gpu, True)
内容的提问来源于stack exchange,提问作者Jiang Wenbo




