AI 数据湖服务提供高效的多模态数据处理能力。其图像检索 Pipeline 解决方案,基于 Daft 分布式计算框架和 LAS 图像处理算子,旨在帮助用户从海量原始图像数据中高效地提取高质量的特征向量。该方案能够将格式各异的图像数据,转化为符合现代检索应用标准的、统一的图像向量数据集,为构建高级图像检索和多模态应用奠定坚实基础。
随着数字内容的爆炸式增长,图像已成为信息传递的核心媒介之一。传统的基于标签(Tags)或元数据(Metadata)的图像搜索方法,严重依赖人工标注,不仅成本高昂,而且无法理解图像深层次的语义内容,难以实现精准的“相似性”查找。
以图搜图、相似推荐等现代应用的核心,是语义检索,即通过理解图像内容本身来进行匹配。这依赖于将图像转化为能够表征其语义的数学表示——特征向量(Embeddings)。然而,构建一个高效的图像向量化流程面临诸多挑战:
LAS 的图像检索 Pipeline 解决方案,正是为了应对上述挑战而设计,提供了一套从图像预处理到向量化输出的端到端自动化流程。
本解决方案采用两阶段流水线架构,包含图像预处理和图像向量化两个核心环节。
阶段 | 功能描述 | 详解 | 算子列表 |
|---|---|---|---|
图像重采样 | 统一图像尺寸、分辨率和格式,标准化图像输入。 | 将不同尺寸和分辨率的输入图像,统一调整为适合视觉模型输入的标准化尺寸(如 224x224 像素),并采用高质量的 lanczos 方法进行重采样。 | ImageResample |
图像向量化 | 使用先进视觉模型提取高维图像特征向量。 | 借助 DINOv2-Large 等预训练 Vision Transformer 模型,从标准化后的图像中提取能够表征其全局语义的高维特征向量(如 1024 维)。 | ImageViTEmbedding |
流水线执行后,会生成一个结构化数据集,其中包含原始图像的位置、其唯一标识符、标准化/重采样图像的路径以及生成的高维特征向量:
image_id | image_tos_path | resampled_image_tos_path | image_embedding (Feature Vector) |
|---|---|---|---|
1 | tos://.../test_image/1.png | tos://.../resample_image/1_resample.png | [0.0119, -0.0407, -0.0320, ...] (1024 dimensions) |
10 | tos://.../test_image/10.png | tos://.../resample_image/10_resample.png | [0.0072, -0.0637, -0.0245, ...] (1024 dimensions) |
11 | tos://.../test_image/11.png | tos://.../resample_image/11_resample.png | [-0.0042, -0.0214, -0.0077, ...] (1024 dimensions) |
12 | tos://.../test_image/12.png | tos://.../resample_image/12_resample.png | [-0.0374, 0.0204, -0.0034, ...] (1024 dimensions) |
13 | tos://.../test_image/13.png | tos://.../resample_image/13_resample.png | [-0.0177, -0.0294, -0.0456, ...] (1024 dimensions) |
14 | tos://.../test_image/14.png | tos://.../resample_image/14_resample.png | [-0.0214, -0.0406, -0.0510, ...] (1024 dimensions) |
15 | tos://.../test_image/15.png | tos://.../resample_image/15_resample.png | [-0.0094, -0.0334, -0.0482, ...] (1024 dimensions) |
16 | tos://.../test_image/16.png | tos://.../resample_image/16_resample.png | [0.0033, -0.0347, -0.0520, ...] (1024 dimensions) |
17 | tos://.../test_image/17.png | tos://.../resample_image/17_resample.png | [-0.0119, -0.0439, -0.0630, ...] (1024 dimensions) |
18 | tos://.../test_image/18.png | tos://.../resample_image/18_resample.png | [-0.0109, -0.0371, 0.0124, ...] (1024 dimensions) |
19 | tos://.../test_image/19.png | tos://.../resample_image/19_resample.png | [-0.0062, -0.0085, -0.0566, ...] (1024 dimensions) |
2 | tos://.../test_image/2.png | tos://.../resample_image/2_resample.png | [-0.0152, -0.0316, -0.0306, ...] (1024 dimensions) |
20 | tos://.../test_image/20.png | tos://.../resample_image/20_resample.png | [0.0025, -0.0325, -0.0477, ...] (1024 dimensions) |
3 | tos://.../test_image/3.png | tos://.../resample_image/3_resample.png | [-0.0236, -0.0017, -0.0299, ...] (1024 dimensions) |
4 | tos://.../test_image/4.png | tos://.../resample_image/4_resample.png | [-0.0461, -0.0657, 0.0395, ...] (1024 dimensions) |
5 | tos://.../test_image/5.png | tos://.../resample_image/5_resample.png | [-0.0125, -0.0032, -0.0459, ...] (1024 dimensions) |
6 | tos://.../test_image/6.png | tos://.../resample_image/6_resample.png | [0.0016, 0.0033, -0.0614, ...] (1024 dimensions) |
7 | tos://.../test_image/7.png | tos://.../resample_image/7_resample.png | [-0.0086, -0.0223, -0.0530, ...] (1024 dimensions) |
8 | tos://.../test_image/8.png | tos://.../resample_image/8_resample.png | [-0.0287, -0.0412, -0.0445, ...] (1024 dimensions) |
9 | tos://.../test_image/9.png | tos://.../resample_image/9_resample.png | [0.0209, -0.0530, -0.0639, ...] (1024 dimensions) |
代码参考
以下是完整的 Pipeline 实现代码,展示了如何加载 TOS 上的图像数据,并依次进行重采样和向量化。
from __future__ import annotations import logging import daft from daft import col from daft.las.functions.image.embedding.image_vit_embedding import ImageViTEmbedding from daft.las.functions.image.image_resample import ImageResample from daft.las.functions.udf import las_udf from daft.las.io.tos import TOSConfig logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) def run_pipeline(input_tos_dir: str): input_s3_dir = input_tos_dir.replace("tos://", "s3://", 1) tos_config = TOSConfig.from_env() IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config()) model_path = "/opt/las/models" logger.info("图像检索pipeline开始运行...") df = daft.from_glob_path( f"{input_s3_dir}/*.png", io_config=IO_CONFIG, ) df = df.with_column( "image_tos_path", col("path").str.replace("s3://", "tos://") ) df = df.with_column( "image_id", col("path").str.split("/").list.get(-1).str.replace(".png", "") ) df = df.select("image_tos_path", "image_id") logger.info("图像文件加载完成: %d 条记录", df.count_rows()) logger.info("开始图像重采样...") df = df.with_column( "resampled_image", las_udf( ImageResample, construct_args={ "image_suffix": ".png", "tos_dir": "tos://tos_bucket/image_retrieval/resample_image/", "local_dir": "", "image_src_type": "image_url", "target_size": [224, 224], "target_dpi": [72, 72], "method": "lanczos", }, num_gpus=0, batch_size=32, concurrency=1, )(col("image_tos_path")), ) df.collect() logger.info("图像重采样完成: %d 条记录", df.count_rows()) logger.info("开始图像向量化...") df = df.with_column( "image_embedding", las_udf( ImageViTEmbedding, construct_args={ "image_src_type": "image_base64", "dtype": "float32", "batch_size": 32, "model_path": model_path, "model_name": "facebook/dinov2-large", "use_cls_token_embedding": True, "rank": 0, }, num_gpus=1, batch_size=32, concurrency=1, )(col("resampled_image")["base64"]), ) df.collect() logger.info("图像向量化完成: %d 条记录", df.count_rows()) df = df.with_column( "resampled_image_tos_path", col("resampled_image")["image_path"] ) df = df.exclude("resampled_image") df.show() if __name__ == "__main__": input_tos_dir = "tos://tos_bucket/image_retrieval/test_image" run_pipeline(input_tos_dir)
说明
代码物料:完整的 Pipeline 脚本可在此处下载:image_retrieval_pipeline_show.py。
执行 Pipeline 后,您将得到一个包含图像标识、原始路径、重采样后路径以及图像特征向量的 Daft DataFrame。
示例日志输出:
2025-08-21 07:30:10,INFO - 图像检索 Pipeline 开始运行... 2025-08-21 07:30:12,INFO - 图像文件加载完成: 150 条记录 2025-08-21 07:30:13,INFO - 开始图像重采样... 2025-08-21 07:31:05,INFO - 图像重采样完成: 150 条记录 2025-08-21 07:31:06,INFO - 开始图像向量化... 2025-08-21 07:32:30,INFO - 图像向量化完成: 150 条记录 2025-08-21 07:32:31,INFO - Pipeline 执行完毕,最终结果如下:
示例输出数据 (df.show()):
image_tos_path | image_id | image_embedding | resampled_image_tos_path |
|---|---|---|---|
tos://.../test_image/cat_01.png | cat_01 | [0.135, -0.244, ... (1024 dims)] | tos://.../resample_image/cat_01.png |
tos://.../test_image/dog_01.png | dog_01 | [-0.087, 0.512, ... (1024 dims)] | tos://.../resample_image/dog_01.png |
tos://.../test_image/car_01.png | car_01 | [0.641, 0.098, ... (1024 dims)] | tos://.../resample_image/car_01.png |
... | ... | ... | ... |
当处理管道在开发机上调试无误后,您可以将其保存为自定义镜像,并在 AI 数据湖服务 -> 任务管理 中创建批量任务。通过该任务,可以对存储在 TOS 上的数百万张图片进行规模化、自动化的向量化处理,为上层检索应用生产数据。