You need to enable JavaScript to run this app.
导航
企业级 RAG 最佳实践:使用 LAS + ByteHouse 构建 PDF 知识库与向量检索系统
最近更新时间:2025.08.21 20:37:03首次发布时间:2025.08.21 17:24:35
复制全文
我的收藏
有用
有用
无用
无用

本实践指南旨在演示如何利用 LAS 端到端地构建一个基于私有 PDF 文档的 RAG(Retrieval-Augmented Generation)知识库。整个流程覆盖了从原始 PDF 文档的上传、智能解析(图文分离)、多模态向量化,到最终数据入湖至 ByteHouse 向量数据库,并提供了通过代码进行高效向量检索的完整实现。
本文将详细拆解环境准备、数据入湖 Pipeline 开发、批量任务执行及向量检索验证的全过程,为用户搭建企业级知识库提供一套完整的、可复现的操作流程。

背景信息

在企业运营和行业研究中,海量的知识通常以 PDF、Word 等非结构化文档形式沉淀。如何高效地从这些文档中提取价值,并将其与大语言模型结合,以问答、摘要、分析等形式提供智能服务,是当前技术领域的一大热点。RAG 技术通过“检索”外部知识库来“增强”大模型的生成能力,是解决这一问题的关键。
构建一个高效的 RAG 系统,需要克服以下挑战:

  • 复杂的文档解析:PDF 等文档通常包含文本、图片、表格等多种元素,需要进行精准的图文分离和内容提取。
  • 有效的数据分块与向量化:需要将长文本切分为语义完整的块(Chunks),并与图片一起转化为高质量的向量(Embeddings)。
  • 可靠的存储与检索:需要一个稳定、高效的向量数据库来存储和管理数十亿级别的向量,并提供低延迟的相似度检索能力。
  • 繁琐的基础设施搭建:整个流程涉及对象存储、计算资源、开发环境、数据库等多个组件的搭建与联通。

LAS 数据湖服务通过提供一站式的多模态数据处理能力和集成开发环境,极大地简化了上述流程。

工作原理

Image
本解决方案包含两大核心阶段:数据入湖向量检索

  1. 数据入湖
    1. 此阶段的目标是将原始 PDF 文件处理成可供检索的向量数据,并存入 ByteHouse。
      • PDF 解析:使用 PDFParse 算子,对 PDF 进行深度解析,分离出纯文本内容和图片文件。
      • 文本分块:使用 ChunkTextSentenceSplitter 算子,将解析出的长文本切分为固定大小且有重叠的语义块。
      • 多模态向量化:使用 DoubaoEmbeddingVision 算子,分别对文本块和图片进行向量化,生成能够表征其语义的 Embedding 向量。
      • 写入数据库:通过自定义的 ClickhouseWriter UDF,将处理后的文本块、图片路径及其对应的向量数据一同写入 ByteHouse 的指定表中。
  2. 向量检索
    1. 此阶段的目标是根据用户输入的查询,从 ByteHouse 中检索出最相关的文档片段。
      • 查询向量化:使用 DoubaoEmbeddingVision 算子,将用户的文本查询(Query)转化为向量。
      • 相似度检索:在 ByteHouse 中执行向量相似度查询(cosineDistance),找出与查询向量最相似的 Top-K 个文本或图片向量。
      • 返回结果:返回与 Top-K 向量对应的原始数据片段(文本内容或图片路径),这些内容可用于后续输入给大语言模型作为上下文。

前提条件

火山引擎账号准备

  1. 使用 LAS 控制台,需先注册火山引擎账号,详见账号注册
  2. 火山引擎账号注册成功后,您需完成相关认证工作,详见个人认证企业认证

环境与资源准备

已开通 AI 数据湖服务对象存储 TOSByteHouse 企业版

步骤一:准备对象存储(TOS)

  1. 登录对象存储 TOS 控制台
  2. 进入 桶列表 创建一个存储桶(Bucket),例如 las-rag-bj
  3. 在存储桶内创建存放数据文件的文件夹路径,例如 test
  4. 将准备好的 PDF 文档上传至该路径。

步骤二:创建并配置 ByteHouse 向量数据库

  1. 登录 ByteHouse 企业版控制台
  2. 集群管理 页面,选择 集群列表 点击 新建集群。等待集群创建完成。
    1. 配置计算资源规格:通用型(1:4) - 16vCPU 64GiB,单副本;节点数:2
    2. 配置存储资源:单节点本地存储:100GiB,不开启冷热分层。
    3. 配置网络资源:选择 VPC、子网、安全组。
    4. 关键配置:在“高级特性”中,务必开启 向量检索 功能。
  1. 进入 数据管理 页面,单击左侧 +新建 后选择 创建库,创建一个新的数据库,例如 pdf_rag_db
  2. 在该数据库下,选择右侧 创建表 创建一张表,用于存储向量数据。详细参数配置参见 新建数据库/表
    • 表结构示例:排序键/主键 设置可根据需求选择,例如 pdf_tos_path

*字段名称

*类型

描述(可选)

pdf_tos_path

String

原始 PDF 的 TOS 路径

modality

String

数据模态

content

String

文本块内容

image_tos_path

String

图片的 TOS 路径

parsed_markdown_tos_path

String

解析后 Markdown 文件的 TOS 路径

embedding

Array(Float32)

向量数据

注意

建表时,默认使用 HaMergeTree 引擎,无法支持唯一键。如果有唯一键需求,务必选择 HaUniqueMergeTree 引擎,并设置"唯一键",后续无法修改。

  1. 记录下 ByteHouse 集群的 连接信息,后续代码中将会使用。
    1. bh_host 从 Bytehouse 企业版 > 集群管理 > 基本信息" 的网关获取
    2. bh_port 默认是 8123
    3. bh_user 从 Bytehouse 企业版 > 集群管理 > 连接集群"获取
    4. bh_password 从账号管理获取密码

步骤三:创建 LAS 数据湖资源

  1. 登录 AI 数据湖服务控制台
  2. 资源管理 -> 队列管理 中,创建一个通用队列,用于运行开发机。
  3. 返回 AI 数据湖服务 控制台,在 数据处理 -> 开发机 中,点击 创建开发机。创建操作详情参见 创建开发机 及 开发机使用快速入门
    • 关联上一步创建的通用队列和负载均衡实例。
    • 将本地生成的 SSH 公钥 (.pub 文件内容) 粘贴到配置中。
  4. 等待开发机创建完成,并记录下负载均衡的公网 IP 和开发机的调用端口

操作流程

本实践基本操作流程如下所示:
步骤一:数据入湖 Pipeline 开发与执行
步骤二:向量数据检索

步骤一:数据入湖 Pipeline 开发与执行

  1. 开发数据入湖脚本
    通过 SSH 登录已创建的开发机,连接开发机只指南参考远程连接开发机,创建一个 Python 脚本(例如 pdf_to_bh.py),编写数据处理 Pipeline。
from __future__ import annotations

from clickhouse_connect.driver import create_client

import daft
from daft import col, lit
from daft.las.functions.ark_llm import DoubaoEmbeddingVision
from daft.las.functions.doc import PDFParse
from daft.las.functions.text import PreSignUrlForTos
from daft.las.functions.text.chunk_text_sentence_splitter import ChunkTextSentenceSplitter
from daft.las.functions.udf import las_udf
from daft.las.io.tos import TOSConfig
import os
import ray
# 打成任务管理使用的镜像时加,开发机调试无需此2行
ray.init()
daft.context.set_runner_ray("ray://127.0.0.1:10001")
@daft.udf(return_dtype=str)
class ClickhouseWriter:
    def __init__(self):
        self.bh_client = create_client(
            host=os.getenv("bh_host", "host"),
            port=os.getenv("bh_port", "8123"),
            user=os.getenv("bh_user", "user"),
            password=os.getenv("bh_pass", "pass"),
            database=os.getenv("bh_database", "database"),
            compression="gzip",
        )
        self.table_name = os.getenv("bh_table", "bh_table")

    def __call__(self, *columns):
        try:
            data_to_insert = [col.to_pylist() for col in columns]

            rows = list(zip(*data_to_insert))
            self.bh_client.insert(
                table=self.table_name,
                data=rows,
            )
            return ["success"] * len(rows)
        except Exception as e:
            print(f"Error: {e}")
            return ["error"]


def run_pipeline(input_tos_dir, output_tos_path):
    input_s3_dir = input_tos_dir.replace("tos://", "s3://", 1)
    tos_config = TOSConfig.from_env()
    IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config())

    df = daft.from_glob_path(
        f"{input_s3_dir}/*.pdf",
        io_config=IO_CONFIG,
    )

    df = df.with_column(
        "pdf_tos_path",
        col("path").apply(
            lambda p: p.replace("s3://", "tos://", 1),
            return_dtype=daft.DataType.string(),
        ),
    )

    # pre sign url for tos
    df = df.with_column(
        "input_pdf_url",
        las_udf(
            PreSignUrlForTos,
            construct_args={
                "expires": 3600,
            },
        )(col("pdf_tos_path")),
    )

    # get filename for PDFParse
    df = df.with_column(
        "filename", col("pdf_tos_path").apply(lambda p: p.split("/")[-1], return_dtype=daft.DataType.string())
    )

    # pdf parse
    df = df.with_column(
        "result",
        las_udf(
            PDFParse,
            construct_args={
                "input_type": "url",
                "output_tos_path": output_tos_path,
                "qps": 1,
            },
            concurrency=1,
            batch_size=1,
        )(col("input_pdf_url"), col("filename")),
    )

    df = df.with_column("plain_text", col("result").struct.get("parsed_plain_text"))
    df = df.with_column("image_filenames", col("result").struct.get("parsed_image_filenames"))
    df = df.with_column("parsed_markdown_tos_path", col("result").struct.get("parsed_file_path"))
    df.collect()

    # image embedding
    df_image = df.select("pdf_tos_path", "image_filenames", "parsed_markdown_tos_path").explode("image_filenames")
    df_image = df_image.with_column("image_tos_path", output_tos_path + "/images/" + col("image_filenames"))

    df_image = df_image.with_column(
        "image_http_path",
        las_udf(
            PreSignUrlForTos,
            construct_args={
                "expires": 3600,
            },
        )(col("image_tos_path")),
    )

    df_image = df_image.with_column(
        "embedding",
        las_udf(
            DoubaoEmbeddingVision,
            construct_args={
                "version": "250328",
                "image_format": "jpeg",
            },
            concurrency=1,
        )(col("image_http_path")),
    )

    df_image = (
        df_image.with_column("modality", lit("image"))
        .with_column("content", lit(""))
        .select("pdf_tos_path", "modality", "content", "image_tos_path", "parsed_markdown_tos_path", "embedding")
    )

    # text chunking and embedding
    df_text = df.with_column(
        "chunks",
        las_udf(
            ChunkTextSentenceSplitter,
            construct_args={
                "content_type": "md",
                "chunk_size": 512,
                "chunk_overlap": 50,
            },
            concurrency=1,
        )(col("plain_text")),
    )

    df_text = df_text.select("pdf_tos_path", "chunks", "parsed_markdown_tos_path").explode("chunks")

    df_text = df_text.with_column(
        "embedding",
        las_udf(
            DoubaoEmbeddingVision,
            construct_args={
                "version": "250328",
                "multimodal_type": "text",
            },
            concurrency=1,
        )(col("chunks")),
    )

    df_text = (
        df_text.select("pdf_tos_path", "chunks", "embedding", "parsed_markdown_tos_path")
        .with_column("modality", lit("text"))
        .with_columns_renamed({"chunks": "content"})
        .with_column("image_tos_path", lit(""))
        .select("pdf_tos_path", "modality", "content", "image_tos_path", "parsed_markdown_tos_path", "embedding")
    )

    df_merged = df_text.concat(df_image)
    df_merged = df_merged.filter(col("embedding").list.length() > 0)
    # write to bytehouse
    df_merged = df_merged.with_column(
        "write_to_bytehouse",
        ClickhouseWriter(*df_merged.columns),
    )

    print(df_merged.filter(col("write_to_bytehouse") == "error").count_rows())


if __name__ == "__main__":
    input_tos_dir = os.getenv("input_tos_dir", "input_tos_dir")
    output_tos_path = os.getenv("output_tos_path", "output_tos_path")
    run_pipeline(input_tos_dir, output_tos_path)
  1. 在开发机调试脚本
    在开发机终端中,通过 export 命令设置所有需要的环境变量,然后执行脚本进行调试。示例:

    #pdf 解析参数
     export TOS_ENDPOINT="https://tos-cn-beijing.volces.com"
     export ACCESS_KEY="xxx"
     export SECRET_KEY="xx="
     export LAS_ACCESS_KEY="xxx"
     export LAS_ACCOUND_ID="xx"
     export DAFT_RUNNER="xxx"
     #bh写入参数
     export bh_host="xx"
     export bh_port="8123"
     export bh_user="xx"
     export bh_pass="xx"
     export bh_database="xx"
     export bh_table="xx"
     #文件入口参数
     export input_tos_dir="xx"
     export output_tos_path="xx"
     #启动命令
     unset  LAS_BASE_URL
     python pdf_2_bh_new.py
    
  2. 保存为自定义镜像
    调试成功后,在 LAS 控制台 > 开发机 页面,找到对应的开发机,点击 保存镜像。为镜像命名并创建,等待镜像构建完成,可以进入 镜像管理 > 自定义镜像 查看镜像构建进度。
    Image

  3. 创建并执行批量入湖任务

  4. 进入 数据处理 > 任务管理 页面,点击 创建任务

  5. 镜像 选项中,选择“自定义镜像”,并找到上一步保存的镜像。

  6. 参数设置 中,以键值对的形式填入所有需要的环境变量(同调试时 export 的变量)。

  7. 任务创建后,在任务列表中点击 执行,选择一个计算队列并配置资源,启动任务。

    1. 可选择 公共队列(后付费) 或者 资源管理-队列管理-计算队列 中创建的 专属队列(预付费)
    2. 启动时计算参数最小设置为:
      • WorkCPU:3
      • WorkMemory:24
  8. 任务将自动拉取镜像,在指定的计算队列上批量处理 input_tos_dir 中的所有 PDF 文件,并将结果写入 ByteHouse。

步骤二:向量数据检索

  1. 开发向量检索脚本

再次登录开发机,创建一个新的 Python 脚本(例如 bh_query.py)用于执行向量检索。

from __future__ import annotations

from clickhouse_connect import get_client

from daft import DataType, col, from_pydict
from daft.las.functions.ark_llm import DoubaoEmbeddingVision
from daft.las.functions.udf import las_udf
import os

def make_query_func(k=3, modality=None):
    def query_fn(embedding):
        if not embedding:
            return []

        bh_client = get_client(
            host=os.getenv("bh_host", "host"),
            port=os.getenv("bh_port", "8123"),
            user=os.getenv("bh_user", "user"),
            password=os.getenv("bh_pass", "pass"),
            connect_timeout=3000000,
            send_receive_timeout=300000,
        )
        database =os.getenv("bh_database", "database")
        collection =os.getenv("bh_table", "bh_table")
        metric = "cosine"
        modality_filter = f"WHERE modality = '{modality}'" if modality else ""

        query = f"""
            SELECT pdf_tos_path, modality, content, image_tos_path
            FROM {database}.{collection}
            {modality_filter}
            ORDER BY {metric}Distance(embedding, {embedding!s})
            LIMIT {k}
            settings enable_new_ann=1, hnsw_ef_s=200
        """

        try:
            return bh_client.query(query).result_rows
        except Exception as e:
            print(f"ClickHouse query failed: {e}")
            return []

    return query_fn


if __name__ == "__main__":
    query_fn_text = make_query_func(k=2, modality="text")
    query_fn_image = make_query_func(k=2, modality="image")

    # text example 
    text_list = ["春天来了", "科技在各个领域都展现出了强大的影响力"]
    df = from_pydict({"text": text_list})
    df = df.with_column(
        "embedding",
        las_udf(
            DoubaoEmbeddingVision,
            construct_args={
                "version": "250328",
                "multimodal_type": "text",
            },
        )(col("text")),
    )

    df = df.with_column(
        "topk_text_results", df["embedding"].apply(query_fn_text, return_dtype=DataType.python())
    ).with_column("topk_image_results", df["embedding"].apply(query_fn_image, return_dtype=DataType.python()))

    df.show()
  1. 在开发机执行检索

同样地,先 export ByteHouse 相关的环境变量,然后执行检索脚本。示例:

#bh读取参数
 export bh_host="xxx"
 export bh_port="xx"
 export bh_user="xxx"
 export bh_pass="xxx"
 export bh_database="xx"
 export bh_table="xx"
 
 # 启动命令 
 python bh_query.py

您将在终端看到一个表格,展示了每个查询文本所检索到的最相关的 Top-K 个文本片段和图片路径。如需了解更多技术细节或定制化需求,请联系技术支持团队。