在本教程中,您将了解在火山引擎机器学习平台(veMLP)上如何基于使用 LAS-SDK 管理数据集并构建 VLM/MLLM 训练所需的数据。
在多模态模型的研发过程中,VLM/MLLM 的训练数据存储与计算方案是关键技术环节。合理且高效的训练数据方案,不仅能够确保模型在多样化的多模态数据上进行有效学习,提升模型的泛化能力和性能表现,还能为模型的可扩展性和可维护性奠定坚实基础。通过科学地规划数据的存储结构、数据的预处理流程以及数据的分发策略等方面,能够最大程度地发挥多模态数据的价值,满足不同应用场景下对模型的需求,进而推动多模态人工智能技术的发展和应用落地。
LAS-SDK 是火山引擎 LAS 服务的 SDK 客户端,它提供了多种能力,包括不限于 LAS 数据集访问、自定义数据源、自定义数据处理算子等。LAS-SDK 底层同时支持 Pandas、Spark、Ray 三种计算引擎,方便用户根据自己的场景选择。通过 LAS-SDK 图文混排的优势有:
在进行 VLM/MLLM 训练前,需提前在 LAS 数据集平台上完成数据集的初始化工作,操作指南详见创建 AI 数据集。
在机器学习平台创建开发机,选择好队列计算资源配置、访问配置、挂载共享文件系统后,即可创建。
vemlp-cn-beijing.cr.volces.com/preset-images/las-dataset:v0.0.1
登录到机器学习平台开发机 WebIDE,打开终端,新建 Jupyter Notebook 交互式开发环境,并选择合适的 Python 环境。
# 查看当前 python 的环境路径,推荐选择 `/usr/bin/python` which python
volc cli 为机器学习平台的命令行工具,可以以命令行的方式便捷的进行任务提交,任务管理等操作。预置镜像已经安装 volc 命令行工具,进行升级操作。
# 升级volc cli ! volc upgrade # 查看volc cli版本 ! volc v
可通过以下操作配置好 volc cli 和 jupyter notebook 需要的的环境依赖。
如果您不知道您的 AK/SK,可以通过 API 访问密钥获得您当前身份的密钥对。
# 火山引擎认证配置(替换为你的 AK/SK) VOLC_ACCESS_KEY_ID = '' VOLC_SECRET_ACCESS_KEY = '' # 默认使用开发机所在的 region import os VOLC_REGION = os.environ['MLP_REGION'] # 一次性设置所有环境变量(Jupyter魔法命令) %set_env VOLC_ACCESS_KEY_ID = {VOLC_ACCESS_KEY_ID} %set_env VOLC_SECRET_ACCESS_KEY = {VOLC_SECRET_ACCESS_KEY} %set_env VOLC_REGION = {VOLC_REGION} # 配置火山命令行工具(自动读取已设置的环境变量) ! volc configure --ak $VOLC_ACCESS_KEY_ID --sk $VOLC_SECRET_ACCESS_KEY --region $VOLC_REGION # 验证配置文件(可选) ! echo "volc config:" && cat ${HOME}/.volc/config ! echo "volc credentials:" && cat ${HOME}/.volc/credentials
本实践重点初始化两个数据集:
将图片文件夹上传至 TOS 上,然后创建数据集,选择图片数据集,填写 TOS 路径即可。
#得到数据集ID : DATASET_ID_IMAGE = ''
首先构造训练数据,数据格式为 CSV,数据结构参考:
字段 | 类型 | 解释 |
---|---|---|
docid | varchar | 文档ID,标识一次请求的唯一ID |
text | varchar | Prompt单轮对话 |
image_id1 | varchar | 第一个引用的图片ID |
image_id2 | varchar | 第二个引用的图片ID |
image_id3 | varchar | 第三个引用的图片ID |
构造完成数据上传至 TOS,并在 LAS 中使用,创建一个数据集。
#得到数据集ID : DATASET_ID_TEXT = ''
原理如图所示:在 LAS 系统中存储着两个不同的数据集。我们采用 JOIN 的方式,将这两个数据集进行关联整合,从而形成一个同时包含文字与图片的综合性数据集。后续,对这个新数据集进行一系列的数据处理,以此为基础开展 VLM/MLLM 模型的训练工作。
在 Python 程序运行环境中,LAS - SDK 集成环境的相关配置。
.env
文件LAS_AK = '' # 用户AK LAS_SK = '' # 用户SK LAS_REGION = '' # 地域名称,例如cn-north-1
以下展示 LAS-SDK 的初始化方法。该 SDK 支持多种不同的计算引擎,用户能够依据自身的具体需求,灵活且精准地挑选最为合适的计算引擎。在此处我们以 Spark 作为示例进行演示,但用户若有其他需求,只需调用 set_engine_pandas()
或 set_engine_ray()
方法,即可轻松实现向其他引擎的切换。
import os from las.data import set_engine_spark from las.infra.las_settings import LasSettings from pyspark.sql import SparkSession def initLasSdk(): """ Initialize LAS SDK with Spark integration. This function sets up a Spark session with specific configurations for: - Proton FileSystem integration - LanceDB catalog integration - TOS (TikTok Object Storage) credentials - Hive Metastore configuration Returns: None """ # Clean up existing Spark and Hadoop environment variables to ensure a fresh configuration if "PYSPARK_PYTHON" in os.environ: del os.environ["PYSPARK_PYTHON"] if "SPARK_HOME" in os.environ: del os.environ["SPARK_HOME"] if "SPARK_CONF_DIR" in os.environ: del os.environ["SPARK_CONF_DIR"] if "HADOOP_HOME" in os.environ: del os.environ["HADOOP_HOME"] if "HADOOP_CONF_DIR" in os.environ: del os.environ["HADOOP_CONF_DIR"] # Load configuration settings settings = LasSettings.new_settings() # Get TOS credentials from settings ak = settings.tos_ak if not settings.tos_ak else settings.tos_ak sk = settings.tos_sk if not settings.tos_sk else settings.tos_sk # Create Spark session with Proton and LanceDB configurations spark_session = SparkSession.builder.appName("test").master("local") \ .config("spark.sql.extensions", "io.proton.spark.ProtonSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.hadoop.mapreduce.outputcommitter.class", "io.proton.commit.Committer") \ .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.EMROutputCommitProtocol") \ .config("spark.hadoop.fs.tos.access-key-id", ak) \ .config("spark.hadoop.fs.tos.fs.tos.secret-access-key", sk) \ .config("spark.hadoop.fs.tos.impl", "io.proton.fs.ProtonFileSystem") \ .config("spark.hadoop.fs.tos.endpoint", settings.tos_endpoint) \ .config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceCatalog") \ .config("spark.sql.catalog.lance.type", "hive") \ .config("spark.sql.catalog.lance.access_key_id", ak) \ .config("spark.sql.catalog.lance.secret_access_key", sk) \ .config("spark.sql.catalog.lance.aws_region", settings.tos_region) \ .config("spark.sql.catalog.lance.virtual_hosted_style_request", "true") \ .config("spark.sql.catalog.lance.hive.hms.client.is.public.cloud", "true") \ .config("spark.sql.catalog.lance.hive.metastore.uris", settings.catalog_uri) \ .config("spark.sql.catalog.lance.hive.client.las.region.name", "cn-beijing") \ .config("spark.sql.catalog.lance.hive.client.las.ak", ak) \ .config("spark.sql.catalog.lance.hive.client.las.sk", sk) \ .config("spark.sql.caseSensitive", "true") \ .getOrCreate() # Set up the Spark engine spark_session = set_engine_spark() # Execute data operations # doSomething() # Clean up resources spark_session.stop()
通过 LAS-SDK 读取数据集中的数据。
以下代码聚焦于数据获取方法的展示。将详细介绍两种获取方式,一是按照正常顺序获取数据,能让用户按既定顺序获取所需信息;二是采用 shuffle 方式获取,可随机打乱数据顺序,满足不同场景下的训练数据使用需求。
from pyspark.sql import DataFrame import las.data as ld def getDataFromLasDataset(dataset_id) -> DataFrame: """ Load data from a LAS dataset. Args: dataset_id (str): The identifier of the LAS dataset. Returns: DataFrame: The loaded data as a PySpark DataFrame. """ # Load the LAS dataset using the provided dataset ID dataset = ld.load_las_dataset(dataset_id) # Convert the loaded LAS dataset to a PySpark DataFrame df = dataset.to_spark_df() return df
def getDataFromLasDatasetShuffle(dataset_id) -> DataFrame: """ Load data from a LAS dataset shuffle. Args: dataset_id (str): The identifier of the LAS dataset. Returns: DataFrame: The loaded data as a PySpark DataFrame. """ # Load the LAS dataset using the provided dataset ID dataset = ld.load_las_dataset(dataset_id) # Convert the dataset to a PySpark DataFrame df = dataset.shuffle().to_spark_df() return df
将读取到的文本与图片数据进行图文混排。
以下代码的主要功能是将读取到的文本与图片数据进行图文混排连接。它会对文本和图片数据进行简单处理,按照一定的规则和格式将二者结合,以实现图文混排的演示效果,为后续的展示与模型训练提供合适的数据结构。
from pyspark.sql import functions as F def joinVisionData() -> DataFrame: """ Join text and image data from LAS datasets. This function performs the following operations: 1. Loads text data from DATASET_ID_TEXT dataset 2. Loads image data from DATASET_ID_IMAGE dataset 3. Joins image data with text data based on image IDs 4. Returns a DataFrame with text and corresponding images Returns: DataFrame: Combined DataFrame with text and image data """ # Load text and image datasets df_text = getDataFromLasDataset(DATASET_ID_TEXT) df_image = getDataFromLasDataset(DATASET_ID_IMAGE) # Join first image data df_text = df_text.join( df_image.select(F.col("__data_item_id").alias("image_id1"), F.col("image")), on="image_id1", how="left" ).withColumnRenamed("image", "image1_byte") # Rename to avoid column name conflicts # Join second image data df_text = df_text.join( df_image.select(F.col("__data_item_id").alias("image_id2"), F.col("image")), on="image_id2", how="left" ).withColumnRenamed("image", "image2_byte") # Join third image data df_text = df_text.join( df_image.select(F.col("__data_item_id").alias("image_id3"), F.col("image")), on="image_id3", how="left" ).withColumnRenamed("image", "image3_byte") # Select final columns and rename image columns vision_df = df_text.select( "docid", # Document identifier "text", # Text content F.col("image1_byte").alias("image1"), # First image data F.col("image2_byte").alias("image2"), # Second image data F.col("image3_byte").alias("image3") # Third image data ) return vision_df
利用图文混排后的数据构建 VLM/MLLM 训练的数据结构。
以下代码展示了如何将图文混排后的数据转换为 OpenAI Vision 训练格式的函数。它接收包含文本和图像数据的分区数据,通过 DataFrame 结构为每条数据生成一个标准化的训练样本。
def formatToTrainingData(df: DataFrame) -> DataFrame: """ Convert text-image pairs from DataFrame into OpenAI VLM training format Args: df: Spark DataFrame containing text and image data Expected columns: - 'text': Text content - 'image1': First image data - 'image2': Second image data - 'image3': Third image data Returns: DataFrame with formatted prompts for VLM/MLLM training """ from pyspark.sql.functions import udf from pyspark.sql.types import StringType def format_prompt(text, image1, image2, image3): """ Format the prompt for VLM/MLLM training. Args: text: Text content image1: First image data image2: Second image data image3: Third image data Returns: Formatted prompt as a JSON string """ images = [] if image1: images.append(image1) if image2: images.append(image2) if image3: images.append(image3) alpaca_format = { "instruction": text, "input": images, "output": "" } import json return json.dumps(alpaca_format) format_prompt_udf = udf(format_prompt, StringType()) result_str = df.withColumn("formatted_prompt", format_prompt_udf("text", "image1", "image2", "image3")) return result_str
以下代码展示了基于 PyTorch 框架构建多模态数据处理管道,实现文本与关联图像的标准化加载,兼容 TorchVision 预处理链。适配 LAS 数据湖的 DataFrame 结构,支持分布式训练场景下的高效数据并行加载,为视觉语言模型提供端到端的数据支撑方案。
from torch.utils.data import Dataset from torch.utils.data import DataLoader from PIL import Image import torch from torchvision import transforms import io class VLMDataset(Dataset): """ Custom dataset for VLM training. Args: df: A Spark DataFrame that contains text and image data. transform: An optional transform to be applied to a sample. """ def __init__(self, df, transform=None): """ Initialize the dataset. Args: df: A Spark DataFrame that contains text and image data. transform: An optional transform to be applied to a sample. """ self.rows = df.collect() self.transform = transform or transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor() ]) def __len__(self): """ Get the total number of samples in the dataset. Returns: int: The total number of samples in the dataset. """ return len(self.rows) def __getitem__(self, idx): """ Get a sample from the dataset. Args: idx: The index of the sample. Returns: tuple: A tuple containing text and a list of image tensors. """ row = self.rows[idx] text = row.text images_data = [row.image1, row.image2, row.image3] images = [] for img_data in images_data: if img_data is not None: try: image = Image.open(io.BytesIO(img_data)) if self.transform: image = self.transform(image) images.append(image) except Exception as e: images.append(torch.zeros(3, 224, 224)) # Placeholder for missing images while len(images) < 3: images.append(torch.zeros(3, 224, 224)) return text, torch.stack(images) # 示例用法 if __name__ == "__main__": # 假设你已经有了一个 DataFrame df df = joinVisionData() # 创建数据集和数据加载器 dataset = VLMDataset(df) # 数据加载器 dataloader = DataLoader(dataset, batch_size=8, shuffle=False)
更多调用操作及参数说明可参考 AI 多模态数据湖服务文档。