使用Rust delta-rs查询Delta Lake中多Parquet文件数据的可运行示例代码请求
Rust delta-rs查询Delta Lake中多Parquet文件数据的可运行示例代码请求
没问题,我可以给你一个经过验证的可运行示例,帮你用delta-rs查询存储在Delta Lake里的多Parquet文件数据。下面是完整的步骤和代码:
1. 配置Cargo.toml依赖
首先确保你的Cargo.toml里包含delta-rs和必要的异步运行时依赖:
[package] name = "delta-query-example" version = "0.1.0" edition = "2021" [dependencies] delta-rs = { version = "0.18.0", features = ["default"] } tokio = { version = "1.0", features = ["full"] } parquet = "41.0.0" arrow = "41.0.0"
2. 可运行的查询代码
下面是完整的异步查询代码,它会加载指定路径的Delta表,扫描所有数据并打印每一行的内容:
use delta_rs::DeltaTable; use delta_rs::scan::DeltaScanBuilder; use tokio; use arrow::array::Array; use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 替换成你的Delta Lake表的本地路径或云存储路径(比如s3://...) let table_path = "./my-delta-table"; // 加载Delta表 let delta_table = DeltaTable::new(table_path).await?; println!("成功加载Delta表,当前版本: {}", delta_table.version()); // 创建扫描器,默认扫描表中所有数据 let scan_builder = DeltaScanBuilder::new(delta_table); let mut scanner = scan_builder.build()?; // 迭代扫描返回的数据批次 while let Some(batch) = scanner.next_batch().await? { // 将Parquet数据批次转换为可遍历的记录 let records = batch.into_iter(); for record in records { // 这里可以根据你的表结构自定义处理逻辑,比如提取特定字段 println!("记录内容: {:?}", record); } } Ok(()) }
3. 测试准备(可选)
如果你还没有测试用的Delta表,可以用下面的代码快速创建一个包含多Parquet文件的Delta表:
use delta_rs::DeltaTable; use delta_rs::operations::write::WriteOptions; use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow::datatypes::{Schema, Field, DataType}; use tokio; use std::sync::Arc; use arrow::array::Array; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let table_path = "./my-delta-table"; let delta_table = DeltaTable::new(table_path).await?; // 定义表的Schema结构 let schema = Schema::new(vec![ Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), ]); // 创建第一批次测试数据(会生成第一个Parquet文件) let batch1 = RecordBatch::try_new( schema.clone(), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn Array>, Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as Arc<dyn Array>, ], )?; // 创建第二批次测试数据(会生成第二个Parquet文件) let batch2 = RecordBatch::try_new( schema, vec![ Arc::new(Int32Array::from(vec![4, 5, 6])) as Arc<dyn Array>, Arc::new(StringArray::from(vec!["David", "Eve", "Frank"])) as Arc<dyn Array>, ], )?; // 将两批次数据写入Delta表 delta_table.write(vec![batch1, batch2], WriteOptions::default()).await?; println!("成功创建测试Delta表,包含多个Parquet文件"); Ok(()) }
运行说明
- 确保你已经安装了标准Rust开发环境
- 把代码保存到
src/main.rs,配置好Cargo.toml后,执行cargo run即可运行查询 - 如果你的Delta表在云存储(比如S3、ADLS),需要给delta-rs添加对应特性(比如
delta-rs = { version = "0.18.0", features = ["s3"] })并配置好云存储的身份认证
备注:内容来源于stack exchange,提问作者Hughes




