You need to enable JavaScript to run this app.
导航
多模态数据集:图文混排
最近更新时间:2025.05.20 11:49:18首次发布时间:2025.05.20 11:49:18
我的收藏
有用
有用
无用
无用

在本教程中,您将了解在火山引擎机器学习平台(veMLP)上如何基于使用 LAS-SDK 管理数据集并构建 VLM/MLLM 训练所需的数据。

  • 了解使用 LAS,构建数据集
  • 通过 LAS-SDK 构建 VLM/MLLM 训练所需数据

功能简介

在多模态模型的研发过程中,VLM/MLLM 的训练数据存储与计算方案是关键技术环节。合理且高效的训练数据方案,不仅能够确保模型在多样化的多模态数据上进行有效学习,提升模型的泛化能力和性能表现,还能为模型的可扩展性和可维护性奠定坚实基础。通过科学地规划数据的存储结构、数据的预处理流程以及数据的分发策略等方面,能够最大程度地发挥多模态数据的价值,满足不同应用场景下对模型的需求,进而推动多模态人工智能技术的发展和应用落地。

优势

LAS-SDK 是火山引擎 LAS 服务的 SDK 客户端,它提供了多种能力,包括不限于 LAS 数据集访问、自定义数据源、自定义数据处理算子等。LAS-SDK 底层同时支持 Pandas、Spark、Ray 三种计算引擎,方便用户根据自己的场景选择。通过 LAS-SDK 图文混排的优势有:

  • 多模态数据统一存储:LAS 数据集提供 Lance 湖格式,图像等多模态数据存成湖格式的一列,不需与标签等数据分开管理。
  • 分布式处理框架:内置预置算子与自定义算子双模式,支持 PB 级数据转换与特征工程
  • 高效访问机制:基于 Lance 列式存储格式,通过元数据索引实现智能数据跳读
  • 全链路安全体系:支持数据加密、访问控制、审计日志等安全机制,保障数据生命周期安全

涉及组件

前提条件

在进行 VLM/MLLM 训练前,需提前在 LAS 数据集平台上完成数据集的初始化工作,操作指南详见创建 AI 数据集

环境配置

在机器学习平台创建开发机,选择好队列计算资源配置、访问配置、挂载共享文件系统后,即可创建。

  • 需要用户在镜像入口采用镜像 URL 的方式:并根据您所在的 region 修改 URL,例:
    • 华北2: vemlp-cn-beijing.cr.volces.com/preset-images/las-dataset:v0.0.1

登录到机器学习平台开发机 WebIDE,打开终端,新建 Jupyter Notebook 交互式开发环境,并选择合适的 Python 环境。

# 查看当前 python 的环境路径,推荐选择 `/usr/bin/python`
which python

volc cli 环境变量配置

volc cli 为机器学习平台的命令行工具,可以以命令行的方式便捷的进行任务提交,任务管理等操作。预置镜像已经安装 volc 命令行工具,进行升级操作。

# 升级volc cli
! volc upgrade
# 查看volc cli版本
! volc v

可通过以下操作配置好 volc cli 和 jupyter notebook 需要的的环境依赖。
如果您不知道您的 AK/SK,可以通过 API 访问密钥获得您当前身份的密钥对。

# 火山引擎认证配置(替换为你的 AK/SK)
VOLC_ACCESS_KEY_ID = ''
VOLC_SECRET_ACCESS_KEY = ''

# 默认使用开发机所在的 region
import os
VOLC_REGION = os.environ['MLP_REGION']

# 一次性设置所有环境变量(Jupyter魔法命令)
%set_env VOLC_ACCESS_KEY_ID = {VOLC_ACCESS_KEY_ID}
%set_env VOLC_SECRET_ACCESS_KEY = {VOLC_SECRET_ACCESS_KEY}
%set_env VOLC_REGION = {VOLC_REGION}

# 配置火山命令行工具(自动读取已设置的环境变量)
! volc configure --ak $VOLC_ACCESS_KEY_ID --sk $VOLC_SECRET_ACCESS_KEY --region $VOLC_REGION

# 验证配置文件(可选)
! echo "volc config:" && cat ${HOME}/.volc/config
! echo "volc credentials:" && cat ${HOME}/.volc/credentials

数据准备

本实践重点初始化两个数据集:

  • 其一为图片数据集,专门用于存放训练所需的图片数据。
  • 其二为文本数据集,用于存放训练所使用的文本数据。

图片数据集准备

将图片文件夹上传至 TOS 上,然后创建数据集,选择图片数据集,填写 TOS 路径即可。

#得到数据集ID :
DATASET_ID_IMAGE = ''

文本数据集准备

首先构造训练数据,数据格式为 CSV,数据结构参考:

字段

类型

解释

docid

varchar

文档ID,标识一次请求的唯一ID

text

varchar

Prompt单轮对话

image_id1

varchar

第一个引用的图片ID

image_id2

varchar

第二个引用的图片ID

image_id3

varchar

第三个引用的图片ID

构造完成数据上传至 TOS,并在 LAS 中使用,创建一个数据集。

#得到数据集ID : 
DATASET_ID_TEXT = ''

图文混排复现流程

原理如图所示:在 LAS 系统中存储着两个不同的数据集。我们采用 JOIN 的方式,将这两个数据集进行关联整合,从而形成一个同时包含文字与图片的综合性数据集。后续,对这个新数据集进行一系列的数据处理,以此为基础开展 VLM/MLLM 模型的训练工作。
Image

配置 LAS-SDK 集成环境

在 Python 程序运行环境中,LAS - SDK 集成环境的相关配置。

配置 .env 文件

LAS_AK = '' # 用户AK
LAS_SK = '' # 用户SK
LAS_REGION = '' # 地域名称,例如cn-north-1

初始化程序

以下展示 LAS-SDK 的初始化方法。该 SDK 支持多种不同的计算引擎,用户能够依据自身的具体需求,灵活且精准地挑选最为合适的计算引擎。在此处我们以 Spark 作为示例进行演示,但用户若有其他需求,只需调用 set_engine_pandas()set_engine_ray() 方法,即可轻松实现向其他引擎的切换。

import os

from las.data import set_engine_spark
from las.infra.las_settings import LasSettings
from pyspark.sql import SparkSession

def initLasSdk():
    """
    Initialize LAS SDK with Spark integration.
    
    This function sets up a Spark session with specific configurations for:
    - Proton FileSystem integration
    - LanceDB catalog integration
    - TOS (TikTok Object Storage) credentials
    - Hive Metastore configuration
    
    Returns:
        None
    """
    # Clean up existing Spark and Hadoop environment variables to ensure a fresh configuration
    if "PYSPARK_PYTHON" in os.environ:
        del os.environ["PYSPARK_PYTHON"]
    if "SPARK_HOME" in os.environ:
        del os.environ["SPARK_HOME"]
    if "SPARK_CONF_DIR" in os.environ:
        del os.environ["SPARK_CONF_DIR"]
    if "HADOOP_HOME" in os.environ:
        del os.environ["HADOOP_HOME"]
    if "HADOOP_CONF_DIR" in os.environ:
        del os.environ["HADOOP_CONF_DIR"]
    
    # Load configuration settings
    settings = LasSettings.new_settings()
    
    # Get TOS credentials from settings
    ak = settings.tos_ak if not settings.tos_ak else settings.tos_ak
    sk = settings.tos_sk if not settings.tos_sk else settings.tos_sk

    # Create Spark session with Proton and LanceDB configurations
    spark_session = SparkSession.builder.appName("test").master("local") \
        .config("spark.sql.extensions", 
                "io.proton.spark.ProtonSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.hadoop.mapreduce.outputcommitter.class", "io.proton.commit.Committer") \
        .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.EMROutputCommitProtocol") \
        .config("spark.hadoop.fs.tos.access-key-id", ak) \
        .config("spark.hadoop.fs.tos.fs.tos.secret-access-key", sk) \
        .config("spark.hadoop.fs.tos.impl", "io.proton.fs.ProtonFileSystem") \
        .config("spark.hadoop.fs.tos.endpoint", settings.tos_endpoint) \
        .config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceCatalog") \
        .config("spark.sql.catalog.lance.type", "hive") \
        .config("spark.sql.catalog.lance.access_key_id", ak) \
        .config("spark.sql.catalog.lance.secret_access_key", sk) \
        .config("spark.sql.catalog.lance.aws_region", settings.tos_region) \
        .config("spark.sql.catalog.lance.virtual_hosted_style_request", "true") \
        .config("spark.sql.catalog.lance.hive.hms.client.is.public.cloud", "true") \
        .config("spark.sql.catalog.lance.hive.metastore.uris", settings.catalog_uri) \
        .config("spark.sql.catalog.lance.hive.client.las.region.name", "cn-beijing") \
        .config("spark.sql.catalog.lance.hive.client.las.ak", ak) \
        .config("spark.sql.catalog.lance.hive.client.las.sk", sk) \
        .config("spark.sql.caseSensitive", "true") \
        .getOrCreate()
    
    # Set up the Spark engine
    spark_session = set_engine_spark()
    
    # Execute data operations
    # doSomething()
    
    # Clean up resources
    spark_session.stop()

读取数据

通过 LAS-SDK 读取数据集中的数据。
以下代码聚焦于数据获取方法的展示。将详细介绍两种获取方式,一是按照正常顺序获取数据,能让用户按既定顺序获取所需信息;二是采用 shuffle 方式获取,可随机打乱数据顺序,满足不同场景下的训练数据使用需求。

正常顺序读取

from pyspark.sql import DataFrame
import las.data as ld

def getDataFromLasDataset(dataset_id) -> DataFrame:
    """
    Load data from a LAS dataset.

    Args:
        dataset_id (str): The identifier of the LAS dataset.
    Returns:
        DataFrame: The loaded data as a PySpark DataFrame.
    """
# Load the LAS dataset using the provided dataset ID
dataset = ld.load_las_dataset(dataset_id)
# Convert the loaded LAS dataset to a PySpark DataFrame
df = dataset.to_spark_df()
return df

打乱顺序获取数据

def getDataFromLasDatasetShuffle(dataset_id) -> DataFrame:
    """
    Load data from a LAS dataset shuffle.

    Args:
        dataset_id (str): The identifier of the LAS dataset.
    Returns:
        DataFrame: The loaded data as a PySpark DataFrame.
    """
    # Load the LAS dataset using the provided dataset ID
    dataset = ld.load_las_dataset(dataset_id)
    # Convert the dataset to a PySpark DataFrame
    df = dataset.shuffle().to_spark_df()
    return df

图文混排

将读取到的文本与图片数据进行图文混排。
以下代码的主要功能是将读取到的文本与图片数据进行图文混排连接。它会对文本和图片数据进行简单处理,按照一定的规则和格式将二者结合,以实现图文混排的演示效果,为后续的展示与模型训练提供合适的数据结构。

from pyspark.sql import functions as F
    
def joinVisionData() -> DataFrame:
    """
    Join text and image data from LAS datasets.
    
    This function performs the following operations:
    1. Loads text data from DATASET_ID_TEXT dataset
    2. Loads image data from DATASET_ID_IMAGE dataset
    3. Joins image data with text data based on image IDs
    4. Returns a DataFrame with text and corresponding images
    
    Returns:
        DataFrame: Combined DataFrame with text and image data
    """
    # Load text and image datasets
    df_text = getDataFromLasDataset(DATASET_ID_TEXT)
    df_image = getDataFromLasDataset(DATASET_ID_IMAGE)

    # Join first image data
    df_text = df_text.join(
        df_image.select(F.col("__data_item_id").alias("image_id1"), F.col("image")),
        on="image_id1",
        how="left"
    ).withColumnRenamed("image", "image1_byte")  # Rename to avoid column name conflicts

    # Join second image data
    df_text = df_text.join(
        df_image.select(F.col("__data_item_id").alias("image_id2"), F.col("image")),
        on="image_id2",
        how="left"
    ).withColumnRenamed("image", "image2_byte")

    # Join third image data
    df_text = df_text.join(
        df_image.select(F.col("__data_item_id").alias("image_id3"), F.col("image")),
        on="image_id3",
        how="left"
    ).withColumnRenamed("image", "image3_byte")

    # Select final columns and rename image columns
    vision_df = df_text.select(
        "docid",  # Document identifier
        "text",   # Text content
        F.col("image1_byte").alias("image1"),  # First image data
        F.col("image2_byte").alias("image2"),  # Second image data
        F.col("image3_byte").alias("image3")   # Third image data
    )
    return vision_df

构建 VLM/MLLM 训练的数据结构

利用图文混排后的数据构建 VLM/MLLM 训练的数据结构。
以下代码展示了如何将图文混排后的数据转换为 OpenAI Vision 训练格式的函数。它接收包含文本和图像数据的分区数据,通过 DataFrame 结构为每条数据生成一个标准化的训练样本。

def formatToTrainingData(df: DataFrame) -> DataFrame:
    """
    Convert text-image pairs from DataFrame into OpenAI VLM training format
    
    Args:
        df: Spark DataFrame containing text and image data
            Expected columns:
            - 'text': Text content
            - 'image1': First image data
            - 'image2': Second image data
            - 'image3': Third image data
    
    Returns:
        DataFrame with formatted prompts for VLM/MLLM training
    """
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType

    def format_prompt(text, image1, image2, image3):
        """
        Format the prompt for VLM/MLLM training.

        Args:
            text: Text content
            image1: First image data
            image2: Second image data
            image3: Third image data
        Returns:
            Formatted prompt as a JSON string
        """
        images = []
        if image1:
            images.append(image1)
        if image2:
            images.append(image2)
        if image3:
            images.append(image3)
        
        alpaca_format = {
            "instruction": text,
            "input": images,
            "output": ""
        }
        import json
        return json.dumps(alpaca_format)

    format_prompt_udf = udf(format_prompt, StringType())

    result_str = df.withColumn("formatted_prompt", format_prompt_udf("text", "image1", "image2", "image3"))

    return result_str

基于多模态数据构建 PyTorch Dataset 及集成 DataLoader 的方法说明

以下代码展示了基于 PyTorch 框架构建多模态数据处理管道,实现文本与关联图像的标准化加载,兼容 TorchVision 预处理链。适配 LAS 数据湖的 DataFrame 结构,支持分布式训练场景下的高效数据并行加载,为视觉语言模型提供端到端的数据支撑方案。

from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from PIL import Image
import torch
from torchvision import transforms
import io

class VLMDataset(Dataset):
    """
    Custom dataset for VLM training.

    Args:
        df: A Spark DataFrame that contains text and image data.
        transform: An optional transform to be applied to a sample.
    """
    def __init__(self, df, transform=None):
        """
        Initialize the dataset.

        Args:
            df: A Spark DataFrame that contains text and image data.
            transform: An optional transform to be applied to a sample.
        """
        self.rows = df.collect()
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor()
        ])
    def __len__(self):
        """
        Get the total number of samples in the dataset.

        Returns:
            int: The total number of samples in the dataset.
        """
        return len(self.rows)
    def __getitem__(self, idx):
        """
        Get a sample from the dataset.

        Args:
            idx: The index of the sample.

        Returns:
            tuple: A tuple containing text and a list of image tensors.
        """
        row = self.rows[idx]
        text = row.text
        images_data = [row.image1, row.image2, row.image3]
        images = []
        
        for img_data in images_data:
            if img_data is not None:
                try:
                    image = Image.open(io.BytesIO(img_data))
                    if self.transform:
                        image = self.transform(image)
                    images.append(image)
                except Exception as e:
                    images.append(torch.zeros(3, 224, 224))  # Placeholder for missing images
        
        while len(images) < 3:
            images.append(torch.zeros(3, 224, 224))
            
        return text, torch.stack(images)

# 示例用法
if __name__ == "__main__":
    # 假设你已经有了一个 DataFrame df
    df = joinVisionData()
    # 创建数据集和数据加载器
    dataset = VLMDataset(df)
    # 数据加载器
    dataloader = DataLoader(dataset, batch_size=8, shuffle=False)

更多调用操作及参数说明可参考 AI 多模态数据湖服务文档