You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

求TensorFlow多GPU推理Python示例:实习工作场景需求

TensorFlow多GPU推理的Python示例

嗨,刚好之前做推理服务时踩过类似的坑,给你分享几个实用的单机多GPU推理方案,都是Python实现的,很适合你实习的服务场景:

1. 使用tf.distribute.MirroredStrategy(官方推荐)

这是TensorFlow官方针对单机多GPU场景提供的分布式策略,能自动把推理任务分发到各个GPU上,代码改动很小,适合快速上手。

示例代码:

import tensorflow as tf
import numpy as np

# 初始化多GPU策略
strategy = tf.distribute.MirroredStrategy()

# 在策略范围内加载模型
with strategy.scope():
    # 替换成你的预训练模型/自定义模型即可
    model = tf.keras.applications.ResNet50(weights='imagenet')

# 准备测试数据(形状为[全局batch_size, 高度, 宽度, 通道数])
batch_size_per_gpu = 32
global_batch_size = batch_size_per_gpu * strategy.num_replicas_in_sync
test_data = np.random.rand(global_batch_size, 224, 224, 3).astype(np.float32)

# 定义分布式推理函数
@tf.function
def distributed_inference(inputs):
    return strategy.run(lambda x: model(x), args=(inputs,))

# 执行推理
results = distributed_inference(test_data)

# 合并各GPU的结果(转换为numpy数组)
concat_results = tf.concat(results.values, axis=0).numpy()
print(f"推理结果形状: {concat_results.shape}")

2. 手动分配GPU并行推理(细粒度控制)

如果需要更灵活地控制每个GPU的任务(比如把不同请求分配到不同GPU),可以手动指定GPU设备实现并行推理。

示例代码:

import tensorflow as tf
import numpy as np
from concurrent.futures import ThreadPoolExecutor

# 定义单GPU推理函数
def inference_on_gpu(gpu_id, model, data):
    with tf.device(f'/GPU:{gpu_id}'):
        return model(data)

# 加载模型(手动管理设备时无需strategy.scope)
model = tf.keras.applications.ResNet50(weights='imagenet')
model.trainable = False  # 推理模式下固定权重,提升性能

# 准备数据并拆分到各个GPU
num_gpus = len(tf.config.list_physical_devices('GPU'))
total_batch_size = 32 * num_gpus
test_data = np.random.rand(total_batch_size, 224, 224, 3).astype(np.float32)
data_splits = np.array_split(test_data, num_gpus)

# 用线程池实现多GPU并行推理
with ThreadPoolExecutor(max_workers=num_gpus) as executor:
    futures = [
        executor.submit(inference_on_gpu, gpu_id, model, split_data)
        for gpu_id, split_data in enumerate(data_splits)
    ]
    # 收集所有GPU的推理结果
    results = [future.result() for future in futures]

# 合并结果
concat_results = np.concatenate(results, axis=0)
print(f"推理结果形状: {concat_results.shape}")

3. 集成到推理服务(以FastAPI为例)

如果要把多GPU推理做成可调用的服务,可以结合FastAPI这类轻量框架,下面是一个简单的示例:

from fastapi import FastAPI
import tensorflow as tf
import numpy as np
from PIL import Image
import io

app = FastAPI()

# 初始化多GPU策略并加载模型
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
    model = tf.keras.applications.ResNet50(weights='imagenet')
    preprocess_input = tf.keras.applications.resnet50.preprocess_input
    decode_predictions = tf.keras.applications.resnet50.decode_predictions

@app.post("/predict")
async def predict(image: bytes):
    # 预处理上传的图片
    img = Image.open(io.BytesIO(image)).resize((224, 224))
    x = np.array(img)[np.newaxis, ...]
    x = preprocess_input(x)
    
    # 多GPU推理
    @tf.function
    def distributed_predict(inputs):
        return strategy.run(lambda x: model(x), args=(inputs,))
    
    preds = distributed_predict(x)
    preds = tf.concat(preds.values, axis=0).numpy()
    
    # 解析并返回结果
    decoded = decode_predictions(preds, top=3)[0]
    return {
        "predictions": [{"class": cls, "probability": float(prob)} for (_, cls, prob) in decoded]
    }

实用注意事项

  • 推理前记得设置model.trainable = False,关闭梯度计算,能大幅提升推理速度
  • 自定义模型建议保存为SavedModel格式,这样在分布式策略下加载不会出现兼容性问题
  • MirroredStrategy适合高吞吐量的批量推理场景,自动拆分数据、合并结果;手动分配GPU的方式更适合处理小批量或异步请求
  • 如果GPU显存不足,可以调小单GPU的batch size,或者启用TensorFlow的显存动态分配:tf.config.experimental.set_memory_growth(gpu, True)

内容的提问来源于stack exchange,提问作者Jiang Wenbo

火山引擎 最新活动