本实践指南旨在演示如何利用 LAS 端到端地构建一个基于私有 PDF 文档的 RAG(Retrieval-Augmented Generation)知识库。整个流程覆盖了从原始 PDF 文档的上传、智能解析(图文分离)、多模态向量化,到最终数据入湖至 ByteHouse 向量数据库,并提供了通过代码进行高效向量检索的完整实现。
本文将详细拆解环境准备、数据入湖 Pipeline 开发、批量任务执行及向量检索验证的全过程,为用户搭建企业级知识库提供一套完整的、可复现的操作流程。
在企业运营和行业研究中,海量的知识通常以 PDF、Word 等非结构化文档形式沉淀。如何高效地从这些文档中提取价值,并将其与大语言模型结合,以问答、摘要、分析等形式提供智能服务,是当前技术领域的一大热点。RAG 技术通过“检索”外部知识库来“增强”大模型的生成能力,是解决这一问题的关键。
构建一个高效的 RAG 系统,需要克服以下挑战:
LAS 数据湖服务通过提供一站式的多模态数据处理能力和集成开发环境,极大地简化了上述流程。
本解决方案包含两大核心阶段:数据入湖和向量检索。
PDFParse 算子,对 PDF 进行深度解析,分离出纯文本内容和图片文件。ChunkTextSentenceSplitter 算子,将解析出的长文本切分为固定大小且有重叠的语义块。DoubaoEmbeddingVision 算子,分别对文本块和图片进行向量化,生成能够表征其语义的 Embedding 向量。ClickhouseWriter UDF,将处理后的文本块、图片路径及其对应的向量数据一同写入 ByteHouse 的指定表中。DoubaoEmbeddingVision 算子,将用户的文本查询(Query)转化为向量。cosineDistance),找出与查询向量最相似的 Top-K 个文本或图片向量。已开通 AI 数据湖服务、对象存储 TOS 和 ByteHouse 企业版。
las-rag-bj。test。16vCPU 64GiB,单副本;节点数:2。100GiB,不开启冷热分层。pdf_rag_db。pdf_tos_path*字段名称 | *类型 | 描述(可选) |
|---|---|---|
pdf_tos_path | String | 原始 PDF 的 TOS 路径 |
modality | String | 数据模态 |
content | String | 文本块内容 |
image_tos_path | String | 图片的 TOS 路径 |
parsed_markdown_tos_path | String | 解析后 Markdown 文件的 TOS 路径 |
embedding | Array(Float32) | 向量数据 |
注意
建表时,默认使用 HaMergeTree 引擎,无法支持唯一键。如果有唯一键需求,务必选择 HaUniqueMergeTree 引擎,并设置"唯一键",后续无法修改。
bh_host 从 Bytehouse 企业版 > 集群管理 > 基本信息" 的网关获取bh_port 默认是 8123bh_user 从 Bytehouse 企业版 > 集群管理 > 连接集群"获取bh_password 从账号管理获取密码.pub 文件内容) 粘贴到配置中。本实践基本操作流程如下所示:
步骤一:数据入湖 Pipeline 开发与执行
步骤二:向量数据检索
pdf_to_bh.py),编写数据处理 Pipeline。from __future__ import annotations from clickhouse_connect.driver import create_client import daft from daft import col, lit from daft.las.functions.ark_llm import DoubaoEmbeddingVision from daft.las.functions.doc import PDFParse from daft.las.functions.text import PreSignUrlForTos from daft.las.functions.text.chunk_text_sentence_splitter import ChunkTextSentenceSplitter from daft.las.functions.udf import las_udf from daft.las.io.tos import TOSConfig import os import ray # 打成任务管理使用的镜像时加,开发机调试无需此2行 ray.init() daft.context.set_runner_ray("ray://127.0.0.1:10001") @daft.udf(return_dtype=str) class ClickhouseWriter: def __init__(self): self.bh_client = create_client( host=os.getenv("bh_host", "host"), port=os.getenv("bh_port", "8123"), user=os.getenv("bh_user", "user"), password=os.getenv("bh_pass", "pass"), database=os.getenv("bh_database", "database"), compression="gzip", ) self.table_name = os.getenv("bh_table", "bh_table") def __call__(self, *columns): try: data_to_insert = [col.to_pylist() for col in columns] rows = list(zip(*data_to_insert)) self.bh_client.insert( table=self.table_name, data=rows, ) return ["success"] * len(rows) except Exception as e: print(f"Error: {e}") return ["error"] def run_pipeline(input_tos_dir, output_tos_path): input_s3_dir = input_tos_dir.replace("tos://", "s3://", 1) tos_config = TOSConfig.from_env() IO_CONFIG = daft.io.IOConfig(s3=tos_config.to_s3_config()) df = daft.from_glob_path( f"{input_s3_dir}/*.pdf", io_config=IO_CONFIG, ) df = df.with_column( "pdf_tos_path", col("path").apply( lambda p: p.replace("s3://", "tos://", 1), return_dtype=daft.DataType.string(), ), ) # pre sign url for tos df = df.with_column( "input_pdf_url", las_udf( PreSignUrlForTos, construct_args={ "expires": 3600, }, )(col("pdf_tos_path")), ) # get filename for PDFParse df = df.with_column( "filename", col("pdf_tos_path").apply(lambda p: p.split("/")[-1], return_dtype=daft.DataType.string()) ) # pdf parse df = df.with_column( "result", las_udf( PDFParse, construct_args={ "input_type": "url", "output_tos_path": output_tos_path, "qps": 1, }, concurrency=1, batch_size=1, )(col("input_pdf_url"), col("filename")), ) df = df.with_column("plain_text", col("result").struct.get("parsed_plain_text")) df = df.with_column("image_filenames", col("result").struct.get("parsed_image_filenames")) df = df.with_column("parsed_markdown_tos_path", col("result").struct.get("parsed_file_path")) df.collect() # image embedding df_image = df.select("pdf_tos_path", "image_filenames", "parsed_markdown_tos_path").explode("image_filenames") df_image = df_image.with_column("image_tos_path", output_tos_path + "/images/" + col("image_filenames")) df_image = df_image.with_column( "image_http_path", las_udf( PreSignUrlForTos, construct_args={ "expires": 3600, }, )(col("image_tos_path")), ) df_image = df_image.with_column( "embedding", las_udf( DoubaoEmbeddingVision, construct_args={ "version": "250328", "image_format": "jpeg", }, concurrency=1, )(col("image_http_path")), ) df_image = ( df_image.with_column("modality", lit("image")) .with_column("content", lit("")) .select("pdf_tos_path", "modality", "content", "image_tos_path", "parsed_markdown_tos_path", "embedding") ) # text chunking and embedding df_text = df.with_column( "chunks", las_udf( ChunkTextSentenceSplitter, construct_args={ "content_type": "md", "chunk_size": 512, "chunk_overlap": 50, }, concurrency=1, )(col("plain_text")), ) df_text = df_text.select("pdf_tos_path", "chunks", "parsed_markdown_tos_path").explode("chunks") df_text = df_text.with_column( "embedding", las_udf( DoubaoEmbeddingVision, construct_args={ "version": "250328", "multimodal_type": "text", }, concurrency=1, )(col("chunks")), ) df_text = ( df_text.select("pdf_tos_path", "chunks", "embedding", "parsed_markdown_tos_path") .with_column("modality", lit("text")) .with_columns_renamed({"chunks": "content"}) .with_column("image_tos_path", lit("")) .select("pdf_tos_path", "modality", "content", "image_tos_path", "parsed_markdown_tos_path", "embedding") ) df_merged = df_text.concat(df_image) df_merged = df_merged.filter(col("embedding").list.length() > 0) # write to bytehouse df_merged = df_merged.with_column( "write_to_bytehouse", ClickhouseWriter(*df_merged.columns), ) print(df_merged.filter(col("write_to_bytehouse") == "error").count_rows()) if __name__ == "__main__": input_tos_dir = os.getenv("input_tos_dir", "input_tos_dir") output_tos_path = os.getenv("output_tos_path", "output_tos_path") run_pipeline(input_tos_dir, output_tos_path)
在开发机调试脚本。
在开发机终端中,通过 export 命令设置所有需要的环境变量,然后执行脚本进行调试。示例:
#pdf 解析参数 export TOS_ENDPOINT="https://tos-cn-beijing.volces.com" export ACCESS_KEY="xxx" export SECRET_KEY="xx=" export LAS_ACCESS_KEY="xxx" export LAS_ACCOUND_ID="xx" export DAFT_RUNNER="xxx" #bh写入参数 export bh_host="xx" export bh_port="8123" export bh_user="xx" export bh_pass="xx" export bh_database="xx" export bh_table="xx" #文件入口参数 export input_tos_dir="xx" export output_tos_path="xx" #启动命令 unset LAS_BASE_URL python pdf_2_bh_new.py
保存为自定义镜像。
调试成功后,在 LAS 控制台 > 开发机 页面,找到对应的开发机,点击 保存镜像。为镜像命名并创建,等待镜像构建完成,可以进入 镜像管理 > 自定义镜像 查看镜像构建进度。
创建并执行批量入湖任务。
进入 数据处理 > 任务管理 页面,点击 创建任务。
在 镜像 选项中,选择“自定义镜像”,并找到上一步保存的镜像。
在 参数设置 中,以键值对的形式填入所有需要的环境变量(同调试时 export 的变量)。
任务创建后,在任务列表中点击 执行,选择一个计算队列并配置资源,启动任务。
任务将自动拉取镜像,在指定的计算队列上批量处理 input_tos_dir 中的所有 PDF 文件,并将结果写入 ByteHouse。
再次登录开发机,创建一个新的 Python 脚本(例如 bh_query.py)用于执行向量检索。
from __future__ import annotations from clickhouse_connect import get_client from daft import DataType, col, from_pydict from daft.las.functions.ark_llm import DoubaoEmbeddingVision from daft.las.functions.udf import las_udf import os def make_query_func(k=3, modality=None): def query_fn(embedding): if not embedding: return [] bh_client = get_client( host=os.getenv("bh_host", "host"), port=os.getenv("bh_port", "8123"), user=os.getenv("bh_user", "user"), password=os.getenv("bh_pass", "pass"), connect_timeout=3000000, send_receive_timeout=300000, ) database =os.getenv("bh_database", "database") collection =os.getenv("bh_table", "bh_table") metric = "cosine" modality_filter = f"WHERE modality = '{modality}'" if modality else "" query = f""" SELECT pdf_tos_path, modality, content, image_tos_path FROM {database}.{collection} {modality_filter} ORDER BY {metric}Distance(embedding, {embedding!s}) LIMIT {k} settings enable_new_ann=1, hnsw_ef_s=200 """ try: return bh_client.query(query).result_rows except Exception as e: print(f"ClickHouse query failed: {e}") return [] return query_fn if __name__ == "__main__": query_fn_text = make_query_func(k=2, modality="text") query_fn_image = make_query_func(k=2, modality="image") # text example text_list = ["春天来了", "科技在各个领域都展现出了强大的影响力"] df = from_pydict({"text": text_list}) df = df.with_column( "embedding", las_udf( DoubaoEmbeddingVision, construct_args={ "version": "250328", "multimodal_type": "text", }, )(col("text")), ) df = df.with_column( "topk_text_results", df["embedding"].apply(query_fn_text, return_dtype=DataType.python()) ).with_column("topk_image_results", df["embedding"].apply(query_fn_image, return_dtype=DataType.python())) df.show()
同样地,先 export ByteHouse 相关的环境变量,然后执行检索脚本。示例:
#bh读取参数 export bh_host="xxx" export bh_port="xx" export bh_user="xxx" export bh_pass="xxx" export bh_database="xx" export bh_table="xx" # 启动命令 python bh_query.py
您将在终端看到一个表格,展示了每个查询文本所检索到的最相关的 Top-K 个文本片段和图片路径。如需了解更多技术细节或定制化需求,请联系技术支持团队。