LAS-SDK 是火山引擎 LAS 服务(Lake and AI Service)的 SDK 客户端,它提供如下能力(截至 2025年4月12号):
map
, filter
, sort
等常见数据表操作,以及类似于 Huggingface Dataset shuffle
, split
等面向 AI 场景的数据操作。LAS-SDK 依赖一些其他包,目前尚未在官方 pypi 托管,需要手动进行安装。
conda create sdk-test python=3.11 conda activate sdk-test # 准备 python 包 wget https://las-core.tos-cn-beijing.volces.com/packages/pylance-0.20.0-cp39-abi3-manylinux_2_28_x86_64.whl wget https://las-core.tos-cn-beijing.volces.com/packages/pyiceberg-0.9.0.0.2-cp311-cp311-manylinux_2_28_x86_64.whl wget https://las-core.tos-cn-beijing.volces.com/packages/ve_lascatalog_sdk-0.0.6.dev1-py3-none-any.whl wget https://las-core.tos-cn-beijing.volces.com/packages/las_sdk-0.2.0.rc1-py3-none-any.whl wget https://las-core.tos-cn-beijing.volces.com/packages/ray-2.44.0-cp311-cp311-manylinux2014_x86_64.whl pip install *.whl -i "https://bytedpypi.byted.org/simple/" # 准备 spark jar(如果需要 spark 的话) wget https://las-core.tos-cn-beijing.volces.com/packages/arrow-c-data-12.0.1.jar wget https://las-core.tos-cn-beijing.volces.com/packages/arrow-dataset-12.0.1.jar wget https://las-core.tos-cn-beijing.volces.com/packages/jar-jni-1.1.1.jar wget https://las-core.tos-cn-beijing.volces.com/packages/lance-core-0.20.0.jar wget https://las-core.tos-cn-beijing.volces.com/packages/lance-spark-0.20.0.jar wget https://las-core.tos-cn-beijing.volces.com/packages/lance-catalog-0.20.0.jar wget https://las-core.tos-cn-beijing.volces.com/packages/proton-hadoop3-bundle-2.2.3.jar wget https://las-core.tos-cn-beijing.volces.com/packages/proton-spark-3.5.1-1.0.jar wget https://las-core.tos-cn-beijing.volces.com/packages/iceberg-spark-runtime-3.5_2.12-1.8.1.jar wget https://las-core.tos-cn-beijing.volces.com/packages/hive-exec-2.3.9-1.5.0.1-LF2-RELEASE-core.jar wget https://las-core.tos-cn-beijing.volces.com/packages/lf-client-2-1.5.0-RELEASE.jar site_package_dir=$(python -c "import sysconfig; print(sysconfig.get_paths()['purelib'])") mv *.jar $site_package_dir/pyspark/jars rm $site_package_dir/pyspark/hive-metastore* # 配置加解密包(某些操作系统版本不支持) wget https://las-core.tos-cn-beijing.volces.com/libs/libcrypto.so.1.1 wget https://las-core.tos-cn-beijing.volces.com/libs/libssl.so.1.1 mkdir /opt/libs mv lib* /opt/libs export LD_LIBRARY_PATH=/opt/libs:$LD_LIBRARY_PATH
LAS-SDK 从环境变量或者 .env
的配置文件中获取相关配置信息,比如访问 TOS 的 ak,sk 等。下面是常用配置列表:
las_ak: str = "" las_sk: str = "" las_region: str = "cn-beijing" # 默认 'cn-beijing' top_service_name: str = "las" # 固定为 las top_service_ak: str = <las_ak> # 同 las_ak top_service_sk: str = <las_sk> # 同 las_sdk top_service_region: str = <las_region> # 默认 'cn-beijing' tos_ak: str = <las_ak> # 同 las_ak tos_sk: str = <las_sk> # 同 las_sk tos_region: str = "cn-beijing" # 默认 'cn-beijing' tos_endpoint: str = "tos-cn-beijing.volces.com" # 如果是内网,volces.com 改为 ivolces.com tos_s3_endpoint: str = "tos-s3-cn-beijing.volces.com" # 如果是内网,volces.com 改为 ivolces.com catalog_uri: str = "" # 参考 https://www.volcengine.com/docs/6492/1264542#%E8%BF%9E%E6%8E%A5%E5%9C%B0%E5%9D%80 “连接地址”
下面的示例展示了如何使用 LAS-SDK 完成一个简单的数据处理步骤:
import las.data as ld from las.data import function, set_engine_spark from pyspark.sql import SparkSession # 设定使用 Spark 做为计算引擎 spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate() set_engine_spark() dataset = ld.reader("csv").load("tos://path/to/csv_foler") @function(identifier="add_one", name="年龄加一", category=Category.DEFAULT, version="v1") class AddOne(MapFunction): """Map which adds 'age' by 1.""" def map(self, row: dict[str, Any], **kwargs) -> dict[str, Any]: row["age"] = row["age"] + 1 return row dataset = dataset.map(TestMap()) dataset.writer("csv").save("tos://path/to/new_folder") spark.stop()
诚如代码所示,这里定义了一个 function
, 其类似于一个 udf
, 使用 map()
操作来将此 function
apply 到整个 dataset,完成数据的 transform。
上边的代码中,@function
的作用是将此函数注册到一个“注册中心”。这是一个可选项,它有两个作用:
LAS 支持使用三种引擎来运行算子 pipeline:
对于同是分布式的 ray 和 spark,涉及到两种引擎的优劣势及选择。这个问题比较复杂,下面是一个比较简单的比较:
比较维度 | ray.data | spark |
---|---|---|
GPU 调度 | 支持 | 不支持 |
异构支持 | 支持 | 不支持 |
数据源 | 丰富 | 丰富 |
数据类型 | 除基础类型非结构化数据外,还支持 image 等类型 | 基础类型,非结构化 |
shuffle、groupby 支持 | 较差 | 完善 |
引擎工作方式 | 流水线式 | 批式 |
执行效率 | 高 | 较高 |
大规模作业执行稳定性 | 一般 | 较好 |
学习资料 | 较少 | 丰富 |
在 las.data 中切换引擎非常简单:
import las.data as ld from las.data import set_engine_pandas, set_engine_ray, set_engine_spark # 使用 pandas 引擎,默认 set_engine_pandas() df = ld.reader("csv").load("tos://path/to/csv/") # 使用 ray 引擎 set_engine_ray(ray_address="ray://localhost:10001") df = ld.reader("csv").load("tos://path/to/csv/") # 使用 spark 引擎 # 设定使用 Spark 做为计算引擎 spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate() set_engine_spark() df = ld.reader("csv").load("tos://path/to/csv/") spark.stop()
LAS-SDK 是基于 Spark/Ray 开发的。它的 dataset 可以无缝转换为 Spark 的 dataframe 或者 ray.data 的 dataset。比如
import las.data as ld from las.data import set_engine_pandas, set_engine_ray, set_engine_spark # 使用 spark 引擎,默认 spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate() set_engine_spark() df = ld.reader("csv").load("tos://path/to/csv/") # 将 las dataset 转为 spark dataframe spark_df = df.to_spark_df() spark.stop()
转换之后,即可使用 spark dataframe 的能力来处理数据。
注意
在使用 spark 引擎的情况下,las dataset 只能转为 spark dataframe,无法转为 ray.data dataset。
LAS-SDK 支持丰富的数据源。
读
las.data.load_csv(paths="tos://path/to/csv") 参数 - paths: csv 文件所在地址,可以是本地目录或者 tos 目录
写
dataset.save_csv(path="tos://path/to/target") 参数 - paths: 目标地址,可以是本地目录或者 tos 目录
读
las.data.load_json(paths="tos://path/to/jsonl") 参数 - paths: jsonl 文件所在地址,可以是本地目录或者 tos 目录
写
dataset.save_json(path="tos://path/to/target") 参数 - paths: 目标地址,可以是本地目录或者 tos 目录
读
las.data.load_parquet(paths="tos://path/to/parquet") 参数 - paths: parquet 文件所在地址,可以是本地目录或者 tos 目录
写
dataset.save_parquet(path="tos://path/to/target") 参数 - path: 目标地址,可以是本地目录或者 tos 目录
读
las.data.load_iceberg(paths="tos://path/to/iceberg") 参数 - paths: icberg 文件所在地址,可以是本地目录或者 tos 目录
写
dataset.save_iceberg(path="tos://path/to/target") 参数 - path: 目标地址,可以是本地目录或者 tos 目录
读
las.data.load_lance(paths="tos://path/to/lance") 参数 - paths: lance 文件所在地址,可以是本地目录或者 tos 目录
写
dataset.save_lance(path="tos://path/to/target") 参数 - path: 目标地址,可以是本地目录或者 tos 目录
读
las.data.reader("image").load(paths="tos://path/to/image", read_as: str | list[str] | None = None) 参数 - paths: image 文件所在地址,可以是本地目录或者 tos 目录 - read_as: 可选 "path", "ndarray", "binary", 默认为 ["path", "ndarray"] 返回 schema 如果是 ["path", "ndarray"] | path | image | | string | np.ndarray | 如果是 ["path", "binary"] | path | image | | string | bytes |
读
las.data.reader("text").load(paths="tos://path/to/text", read_as: str | list[str] | None = None, whole_text: bool = True) 参数 - paths: text 文件所在地址,可以是本地目录或者 tos 目录 - read_as: 可选 "path", "text", 默认为 ["path", "text"] - whole_text: 是否将整个文件作为 dataset 的一行,默认为 True。否则,文件中的一行将作为 dataset 的一行 返回 schema 如果是 ["path", "text"] | path | text | | string | string |
读
las.data.reader("csv").load(paths="tos://path/to/binary", read_as: str | list[str] | None = None) 参数 - paths: 二进制文件所在地址,可以是本地目录或者 tos 目录 - read_as: 可选 "path", "binary", 默认为 ["path"] 返回 schema 如果是 ["path", "binary"] | path | bytes | | string | bytes |
读
las.data.load_las_dataset(dataset_id: str, api_client: OpenAPIClient | None = None) 参数 - dataset_id: 数据集id(注意不是数据集名称) - api_client: OpenAPI 客户端。如果没有会自动创建
写
dataset.save_las_dataset(mode: str, dataset_id: str, dataset_name: str | None = None, tos_path: str | None = None, category: str | None = None, api_client: OpenAPIClient = None, **kwargs) 参数 - mode: 模式,支持 create, append - dataset_id: 数据集id。如果是 append,这个值为已知数据集;如果是 create,则为定义的 id - dataset_name: 数据集name,create 时必须 - tos_path: 用于存储新建数据集的 tos 路径 - category: 数据集类型,支持 COMMON/CUSTOM/OUTCOME_RECORD。分别对应 LAS 的普通数据集,用户自定义数据集,以及数据回流集(推理结果回流) - api_client: OpenAPI 客户端。如果没有会自动创建
写
las.data.writer("fangzhou").save(name="dataset_name")
说明
Huggingface 数据集暂不支持。
Ray | Spark | ||
---|---|---|---|
csv | ✅ | ✅ | |
json | ✅ | ✅ | |
parquet | ✅ | ✅ | |
lance | path 方式 | ✅ | |
las catalog 方式 | |||
iceberg | path 方式 | ❌ | ❌ |
las catalog 方式 | ✅ | ✅ | |
las-dataset | dataset_id 方式 | ✅ | ✅ |
text | ✅ | ✅ | |
binary | ✅ | ✅ | |
image | ✅ | ✅ | |
video | ❌ | ❌ | |
audio | ❌ | ❌ |
to_pandas_df
def to_pandas_df(self) -> pd.DataFrame: """ Convert the current dataset to a Pandas DataFrame. This method checks if the current dataset is a Pandas DataFrame and returns it. If the dataset is not a Pandas DataFrame, it raises a TypeError. Returns ------- pd.DataFrame: The current dataset as a Pandas DataFrame. Raises ------ TypeError: If the current dataset is not a Pandas DataFrame. """
to_spark_df
def to_spark_df(self) -> ps.DataFrame: """ Convert the current dataset to a Spark DataFrame. This method checks if the current dataset is a Spark DataFrame and returns it. If the dataset is not a Spark DataFrame, it raises a TypeError. Returns ------- ps.DataFrame: The current dataset as a Spark DataFrame. Raises ------ TypeError: If the current dataset is not a Spark DataFrame. """
to_ray_ds
def to_ray_ds(self) -> rd.Dataset: """ Convert the current dataset to a Ray Dataset. This method checks if the current dataset is a Ray Dataset and returns it. If the dataset is not a Ray Dataset, it raises a TypeError. Returns ------- rd.Dataset: The current dataset as a Ray Dataset. Raises ------ TypeError: If the current dataset is not a Ray Dataset. """
select
def select(self, *cols, **kwargs) -> "Dataset": """ Select one or more columns from the dataset, and return a new dataset. Parameters ---------- cols : str | list[str] The selected columns. Returns ------- Dataset: A new Dataset with the selected columns. """
add_column
import pandas as pd def add_column( self, col: str, fn: Callable[[pd.DataFrame], pd.Series], col_type: str | None = None, **kwargs ) -> "Dataset": """ Add a column in the dataset with the given expression. Parameters ---------- col : str The column name. col_type : str The type of the column. fn : Callable[[dict[str, Any]], dict[str, Any]] The function that gives the new column value. Returns ------- Dataset: A new Dataset.
drop_columns
def drop_columns(self, *cols, **kwargs) -> "Dataset": """ Drop columns from the dataset and return a new dataset. Parameters ---------- cols : str | list[str] The columns to be removed. Returns ------- Dataset: A new dataset without the dropped columns. """
rename_columns
def rename_columns(self, cols: dict[str, str], **kwargs) -> "Dataset": """ Rename the column name(s) with new columns name(s). Parameters ---------- cols : dict[str, str] A dict mapped the old column name(s) to new name(s). Returns ------- Dataset: A new dataset the renamed columns. """
sort
def sort(self, cols: str | list[str], ascending: bool | list[bool] = False) -> "Dataset": """ Sort the dataset by the given keys. Parameters ---------- cols : str | list[str] The sort keys. ascending : bool | list[bool] The sort order. Returns ------- Dataset: The sorted dataset. """
shuffle
def shuffle(self, **kwargs) -> "Dataset": """ Shuffle the dataset. Returns ------- Dataset: The shuffled dataset. """
map
def map(self, fn: Callable[[dict[str, Any]], dict[str, Any]], **kwargs: Any) -> "Dataset": """ Apply a function to each element of the dataset. This method applies a provided function to each element of the dataset. The function should take an item from the dataset and return a transformed item. Parameters ---------- fn : Callable[[dict[str, Any]], dict[str, Any]] The function to apply to each element of the dataset. **kwargs : Any Additional keyword arguments to pass to the function. Returns ------- Dataset: A new Dataset with the function applied to each element. """
map_batch
import pandas as pd def map_batch(self, fn: Callable[[pd.DataFrame], pd.DataFrame], **kwargs: Any) -> "Dataset": """ Apply a function to each batch of the dataset, where the function takes an iterator of items and returns an iterator of items. Parameters ---------- fn : Callable[[Iterator[dict[str, Any]]], Iterator[dict[str, Any]]] The function to apply to each batch of the dataset. **kwargs : Any Additional keyword arguments to pass to the function. Returns ------- Dataset: A new Dataset with the function applied to each batch. """
filter
def filter(self, fn: Callable[[dict[str, Any]], bool], **kwargs: Any) -> "Dataset": """ Filter the dataset using a provided function. Parameters ---------- fn : Callable[[dict[str, Any]], bool] A function that takes an item from the dataset and returns True if the item should be included in the filtered dataset, and False otherwise. **kwargs : Any Additional keyword arguments to pass to the function. Returns ------- Dataset: A new Dataset containing only the items for which the function returned True. """
flat_map
def flat_map(self, fn: Callable[[dict[str, Any]], Iterable[dict[str, Any]]], **kwargs: Any) -> "Dataset": """ Apply a function that returns a list to each element of the dataset. Parameters ---------- fn : Callable[[dict[str, Any]], list[dict[str, Any]]] The function to apply to each element of the dataset. **kwargs : Any Additional keyword arguments to pass to the function. Returns ------- Dataset: A new Dataset with the function applied to each element. """
schema
def schema(self) -> Any: """ Get the schema of the current dataset. This method returns the schema of the current dataset, depending on its type. Returns ------- Any: The schema of the current dataset. """
split
def split(self, weights: list[float], seed: int | None = None) -> list["Dataset"]: """ Split the dataset into multi-parts. Parameters ---------- weights : list[float] The proportion of each part of the dataset. seed : int | None The seed to generate ramdom number for split the dataset randomly. Returns ------- list["Dataset"]: a list which contains two datasets: The first represents the train set while the second represents the test set. """
show
def show(self, limit: int = 20) -> None: """ Show the dataset. Parameters ---------- limit : int The number of rows to be showed, with a default value of 20. """
LAS-SDK 目前支持四类 function 类算子:
map
from las.data.function.version import v1 from las.data.registry import function from las.data.types import Category, Field, MapFunction OP_ID = "fix_unicode" # @function 注册:可选 @function(name="Unicode错误修复", identifier=OP_ID, category=Category.TEXT, version=v1) class FixUnicode(MapFunction): """修复文本中的Unicode错误""" # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。 input_col_name: str = Field( title="输入列名", description="输入内容所在的列名", ) normalization: str = Field( default="NFC", title="unicode规范化形式", description="unicode编码的规范化形式,支持的形式有NFC、NFKC、NFD、NFKD", enum_values=["NFC", "NFKC", "NFD", "NFKD"], ) # 初始化函数 def __init__(self, **kwargs) -> None: """Initialization.""" super().__init__(**kwargs) if self.normalization and len(self.normalization) > 0: self.normalization = self.normalization.upper() else: self.normalization = "NFC" if self.normalization.upper() not in ["NFC", "NFKC", "NFD", "NFKD"]: msg = ( f"Normalization mode [{self.normalization}] is not " "supported. Can only be one of " '["NFC", "NFKC", "NFD", "NFKD"]' ) raise ValueError(msg) # map 函数:输入一行,输出一行 def map(self, row: dict[str, Any], **kwargs) -> dict[str, Any]: row[self.input_col_name] = ftfy.fix_text( row[self.input_col_name], normalization=self.normalization ) return row
map_batch
map_batch
的作用同 map
,只是使用了一种批的方式来完成任务,从而更加高效。
import pandas as pd from las.data.function.version import v1 from las.data.registry import function from las.data.types import Category, Field, BatchMapFunction OP_ID = "fix_unicode_batch" # @function 注册:可选 @function(name="Unicode错误修复", identifier=OP_ID, category=Category.TEXT, version=v1) class FixUnicodeBatch(BatchMapFunction): """修复文本中的Unicode错误""" # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。 input_col_name: str = Field( title="输入列名", description="输入内容所在的列名", ) normalization: str = Field( default="NFC", title="unicode规范化形式", description="unicode编码的规范化形式,支持的形式有NFC、NFKC、NFD、NFKD", enum_values=["NFC", "NFKC", "NFD", "NFKD"], ) # 初始化函数 def __init__(self, **kwargs) -> None: """Initialization.""" super().__init__(**kwargs) if self.normalization and len(self.normalization) > 0: self.normalization = self.normalization.upper() else: self.normalization = "NFC" if self.normalization.upper() not in ["NFC", "NFKC", "NFD", "NFKD"]: msg = ( f"Normalization mode [{self.normalization}] is not " "supported. Can only be one of " '["NFC", "NFKC", "NFD", "NFKD"]' ) raise ValueError(msg) # map 函数:输入一个 batch,输出一个 batch。每个 batch 的类型为 pandas dataframe。 def map_batch(self, rows: pd.DataFrame, **kwargs) -> pd.DataFrame: rows[self.input_col_name] = rows.apply( lambda row: fyfy.fix_text(row[self.input_col_name), normalization), axis=1 ) return rows
flat_map
@function(identifier="text_split", name="文本切分", category=Category.DEFAULT, version="v1") class TestFlatMap(FlatMapFunction): """Flat map that doubles the row.""" # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。 input_col_name: str = Field( title="输入列名", description="输入内容所在的列名", ) chunk_size: int = Field( default=100, title="文本片段长度", description="被切分的文本按照文本片段长度进行切分", ) keep_original: bool = Field( title="保留原字段", description="如果要保留原字段,则切分后的字段为 input_col_name + '_chunk'", default=True ) def flat_map(self, row: dict[str, Any], **kwargs) -> list[dict[str, Any]]: if self.input_col_name not in row: return [row] text_to_split = str(row[self.input_col_name]) text_length = len(text_to_split) num_chunks = (text_length + self.chunk_size - 1) // self.chunk_size result = [] for i in range(num_chunks): start = i * self.chunk_size end = start + self.chunk_size chunk = text_to_split[start:end] new_row = row.copy() if self.keep_original: new_row[f"{self.split_field}_chunk"] = chunk else: new_row[self.input_col_name] = chunk result.append(new_row) return result
filter
from las.data.function.version import v1 from las.data.registry import function from las.data.types import Category, Field, FilterFunction OP_ID = "test_length_filter" @function(name="文本长度过滤", identifier=OP_ID, category=Category.TEXT, version=v1) class TextLengthFilter(FilterFunction): """过滤出文本长度在指定范围内的文本。""" # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。 input_col_name: str = Field( title="输入列名", description="输入内容所在的列名", ) min_len: int = Field( default=10, title="文本长度下限", description="过滤掉文本长度低于该值的文本", ) max_len: int = Field( default=sys.maxsize, title="文本长度上限", description="过滤掉文本长度高于该值的文本", ) # 输入一行,输出一个 bool 值 def filter(self, row: dict[str, Any], **kwargs) -> bool: """ Parameters ---------- row: dict[str, Any] Row to filter. Returns ------- bool True if the row should be keep. """ text_len = len(row[self.input_col_name]) return self.min_len <= text_len <= self.max_len
说明
deduplicate
暂不支持。
算子使用有两种方式:
from las.data as ld from las.data import set_engine_ray # 设定使用 ray 引擎 set_engine_ray() df = ld.reader("csv").load("tos://path/to/csv/") df = df.map(FixUnicode(input_col_name="content", nomalization="NFC") df.writer("csv").save("tos://path/to/csv/processed/")
Yaml 配置协议定义:
api_version: "v1" name: "test_job" metadata: engine: name: "spark" # 执行引擎 options: # 设定执行引擎的初始化参数 op_1: "1" op_2: "2" environment: en_1: "1" en_2: "2" pipeline: - datasource: # 数据源 path: "tos://path/to/csv/" ds_name: "csv" # 数据源名称(格式) - functions: # 算子列表 - func_1: # 算子名称 param_1: "p1" # 算子参数,用于作为构造函数的初始化参数 param_2: "3.0" runtime_cfgs: # 运行时配置,比如使用多少 gpu 运行该算子 num_cpus: "1" num_gpus: "2" - func_2: param_1: "p1" param_2: "3.0" runtime_cfgs: cfg_1: "1" cfg_2: "2" - datasink: # 数据目标 path: "tos://path/to/sink" ds_name: "parquet" # 数据目标名称(格式) ds_config: # 数据源选项,一般为数据库连接等。大多数情况下留空 cf_1: "1" cf_2: "2" options: # 写数据选项 op_1: "/data/output"
对应上边代码方式的 Yaml 协议为:
api_version: "v1" name: "test_job" metadata: engine: name: "ray" pipeline: - datasource: path: "tos://path/to/csv/" ds_name: "csv" - functions: - fix_unicode: input_col_name: "content" nomalization: "NFC" - datasink: path: "tos://path/to/csv/processed/" ds_name: "csv"
执行:
python -m pipe_runner -f /path/to/yaml_file.yaml
已创建访问密钥并开通 LAS 服务
根据需求选择 lf-client-2 或者 lf-client-3 进行接入。
说明
兼容开源 hive2协议。
添加依赖
<dependency> <groupId>org.apache.hive</groupId> <artifactId>lf-client-2</artifactId> <version>1.0.1-RELEASE</version> </dependency>
请求示例
String ak = {ak}; String sk = {sk}; HiveConf conf = new HiveConf(); conf.set(HiveConf.ConfVars.METASTOREURIS.varname, {endPoint}); HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(conf); hmsClient.setRegion("cn-beijing"); hmsClient.setAccessKeyIdAndSecretAccessKey(ak, sk); List<String> allDatabases = hmsClient.getAllDatabases(); System.out.println(allDatabases); Table table = hmsClient.getTable("db_test","tbl_test"); System.out.println(table);
说明
兼容开源 hive3协议。
添加依赖
<dependency> <groupId>org.apache.hive</groupId> <artifactId>lf-client-3</artifactId> <version>1.0.1-RELEASE</version> </dependency>
请求示例
public static void main(String[] args) throws Exception { String ak = {ak}; String sk = {sk}; Configuration conf = new Configuration(); conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getVarname(), {endPoint}); HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(conf); hmsClient.setRegion("cn-beijing"); hmsClient.setAccessKeyIdAndSecretAccessKey(ak, sk); Database database = hmsClient.getDatabase("xyu_test"); Table table = hmsClient.getTable("db_test","tbl_test"); System.out.println(database); System.out.println(table); }
hive2.x sdk
lf-client-2 | hive-exec | 发布日期 | 发布内容 |
---|---|---|---|
1.0.1-RELEASE | 1.0.0-LF2-RELEASE | 20240521 | |
1.0.0-RELEASE | 1.0.0-LF2-RELEASE | 20240311 |
hive3.x sdk
lf-client-3 | hive-exec | 发布日期 | 发布内容 |
---|---|---|---|
1.0.1-RELEASE | 1.0.0-LF3-RELEASE | 20240521 | |
1.0.0-RELEASE | 1.0.0-LF3-RELEASE | 20240311 |
region | endpoint | 备注 |
---|---|---|
cn-beijing | thrift://lakeformation.las.cn-beijing.ivolces.com:48869 | 仅支持火山内部访问,不支持公网访问 |
cn-shanghai | thrift://lakeformation.las.cn-shanghai.ivolces.com:48869 | |
cn-guangzhou | thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869 |
已创建访问密钥并开通LAS服务。
添加依赖
<dependency> <groupId>bytedance.olap</groupId> <artifactId>gemini-client-shaded</artifactId> <version>1.0.0.3-RELEASE</version> </dependency>
请求示例
public static void main(String[] args) throws Exception { String ak = {ak}; String sk = {sk}; String endPoint = {endPoint}; GeminiClientIface signingClient = GeminiClientFactory.createSigningClient(endpoint, ak, sk, null, null, null); List<GeminiPrivilege> privileges = new ArrayList<>(); GeminiPrivilege privilege = new GeminiPrivilege(); GeminiResource geminiResource = new GeminiResource(); geminiResource.setResourceScope(GeminiResourceScope.TABLE); geminiResource.setRegion("cn-beijing"); geminiResource.setTenant("2100000001"); geminiResource.setSchemaName("@hive#db_test"); geminiResource.setTableName("tb_test"); privilege.setResource(geminiResource); privilege.setAction(GeminiPrivilegeAction.DESCRIBE); privileges.add(privilege); GeminiPrincipal principal = new GeminiPrincipal(); principal.setTenant("2100000001"); principal.setPrincipalType(GeminiPrincipalType.ACCOUNT); principal.setPrincipalName("1008632"); signingClient.checkPrivilegesForPolicy(principal,privileges,null,null); }
发布版本 | 发布日期 | 发布内容 |
---|---|---|
1.0.0.18-SNAPSHOT | 20240417 | 增加revoke_privileges_for_dropped_resources方法 |
1.0.0.3-RELEASE | 20240514 | 去除log相关依赖,对部分第三方依赖做 relocate;CatalogName默认设置成hive,修复grant和revoke没有带catalogName的问题,以及优化gemini-client的打包方式 |
region | endpoint | 备注 |
---|---|---|
cn-beijing | thrift://100.96.5.173:48869 | 仅支持火山内部访问,不支持公网访问,需要开白 |
cn-shanghai | thrift://100.96.4.175:48869 | 仅支持火山内部访问,不支持公网访问,需要开白 |
cn-guangzhou | thrift://100.96.4.70:48869 | 仅支持火山内部访问,不支持公网访问,需要开白 |