You need to enable JavaScript to run this app.
导航
SDK 参考
最近更新时间:2025.04.25 11:39:58首次发布时间:2024.06.11 19:06:01
我的收藏
有用
有用
无用
无用

什么是 LAS-SDK

LAS-SDK 是火山引擎 LAS 服务(Lake and AI Service)的 SDK 客户端,它提供如下能力(截至 2025年4月12号):

  1. 数据集/数据源读写:用户可以使用 SDK 读写 LAS 数据集,以及 csv、json、parquet、文本、图片、二进制等多种数据源格式。
  2. 数据源自定义:用户可以使用 SDK 定义自己的数据源,并使用 SDK 读写。
  3. 数据处理接口:LAS-SDK 提供了类似于 PySpark Dataframe 的 Dataset 接口,可以完成诸如 map, filter, sort 等常见数据表操作,以及类似于 Huggingface Dataset shuffle, split 等面向 AI 场景的数据操作。
  4. 算子框架:LAS-SDK 提供了一套算子框架,用户可以基于 LAS-SDK 编写自己的算子,并使用 LAS-SDK 执行。
  5. 多引擎支持:LAS-SDK 支持 Pandas、Spark、Ray 三种引擎作为计算后端,用户可以根据场景选择特定的引擎。默认为 Pandas。

安装

LAS-SDK 依赖一些其他包,目前尚未在官方 pypi 托管,需要手动进行安装。

conda create sdk-test python=3.11
conda activate sdk-test

# 准备 python 包
wget https://las-core.tos-cn-beijing.volces.com/packages/pylance-0.20.0-cp39-abi3-manylinux_2_28_x86_64.whl
wget https://las-core.tos-cn-beijing.volces.com/packages/pyiceberg-0.9.0.0.2-cp311-cp311-manylinux_2_28_x86_64.whl
wget https://las-core.tos-cn-beijing.volces.com/packages/ve_lascatalog_sdk-0.0.6.dev1-py3-none-any.whl
wget https://las-core.tos-cn-beijing.volces.com/packages/las_sdk-0.2.0.rc1-py3-none-any.whl
wget https://las-core.tos-cn-beijing.volces.com/packages/ray-2.44.0-cp311-cp311-manylinux2014_x86_64.whl

pip install *.whl -i "https://bytedpypi.byted.org/simple/"

# 准备 spark jar(如果需要 spark 的话)
wget https://las-core.tos-cn-beijing.volces.com/packages/arrow-c-data-12.0.1.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/arrow-dataset-12.0.1.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/jar-jni-1.1.1.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/lance-core-0.20.0.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/lance-spark-0.20.0.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/lance-catalog-0.20.0.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/proton-hadoop3-bundle-2.2.3.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/proton-spark-3.5.1-1.0.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/iceberg-spark-runtime-3.5_2.12-1.8.1.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/hive-exec-2.3.9-1.5.0.1-LF2-RELEASE-core.jar
wget https://las-core.tos-cn-beijing.volces.com/packages/lf-client-2-1.5.0-RELEASE.jar

site_package_dir=$(python -c "import sysconfig; print(sysconfig.get_paths()['purelib'])")
mv *.jar $site_package_dir/pyspark/jars
rm $site_package_dir/pyspark/hive-metastore*

# 配置加解密包(某些操作系统版本不支持)
wget https://las-core.tos-cn-beijing.volces.com/libs/libcrypto.so.1.1
wget https://las-core.tos-cn-beijing.volces.com/libs/libssl.so.1.1
mkdir /opt/libs
mv lib* /opt/libs
export LD_LIBRARY_PATH=/opt/libs:$LD_LIBRARY_PATH

配置

LAS-SDK 从环境变量或者 .env 的配置文件中获取相关配置信息,比如访问 TOS 的 ak,sk 等。下面是常用配置列表:

las_ak: str = ""
las_sk: str = ""
las_region: str = "cn-beijing"          # 默认 'cn-beijing'

top_service_name: str = "las"           # 固定为 las
top_service_ak: str = <las_ak>          # 同 las_ak
top_service_sk: str = <las_sk>          # 同 las_sdk
top_service_region: str = <las_region>  # 默认 'cn-beijing'

tos_ak: str = <las_ak>                  # 同 las_ak
tos_sk: str = <las_sk>                  # 同 las_sk
tos_region: str = "cn-beijing"          # 默认 'cn-beijing'
tos_endpoint: str = "tos-cn-beijing.volces.com"         # 如果是内网,volces.com 改为 ivolces.com
tos_s3_endpoint: str = "tos-s3-cn-beijing.volces.com"   # 如果是内网,volces.com 改为 ivolces.com

catalog_uri: str = ""        # 参考 https://www.volcengine.com/docs/6492/1264542#%E8%BF%9E%E6%8E%A5%E5%9C%B0%E5%9D%80  “连接地址”

使用示例

下面的示例展示了如何使用 LAS-SDK 完成一个简单的数据处理步骤:

import las.data as ld
from las.data import function, set_engine_spark
from pyspark.sql import SparkSession

# 设定使用 Spark 做为计算引擎
spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate()
set_engine_spark()

dataset = ld.reader("csv").load("tos://path/to/csv_foler")

@function(identifier="add_one", name="年龄加一", category=Category.DEFAULT, version="v1")
class AddOne(MapFunction):
    """Map which adds 'age' by 1."""

    def map(self, row: dict[str, Any], **kwargs) -> dict[str, Any]:
        row["age"] = row["age"] + 1
        return row
        
dataset = dataset.map(TestMap())
dataset.writer("csv").save("tos://path/to/new_folder")

spark.stop()

诚如代码所示,这里定义了一个 function, 其类似于一个 udf, 使用 map() 操作来将此 function apply 到整个 dataset,完成数据的 transform。
上边的代码中,@function 的作用是将此函数注册到一个“注册中心”。这是一个可选项,它有两个作用:

  1. 用户可以使用 Yaml 的方式来串联 function,不必写一个 dataset 作业,是一种低代码的方式。
  2. 方便用户将函数注册到 LAS 平台:LAS 会据此扫描函数相关信息,如果没有它的帮助,用户需要手动填写信息。

引擎支持

LAS 支持使用三种引擎来运行算子 pipeline:

  1. pandas,适用于单机情况
  2. ray,适用于分布式情况
  3. spark,适用于分布式情况

对于同是分布式的 ray 和 spark,涉及到两种引擎的优劣势及选择。这个问题比较复杂,下面是一个比较简单的比较:

比较维度

ray.data

spark

GPU 调度

支持

不支持

异构支持

支持

不支持

数据源

丰富

丰富

数据类型

除基础类型非结构化数据外,还支持 image 等类型

基础类型,非结构化

shuffle、groupby 支持

较差

完善

引擎工作方式

流水线式

批式

执行效率

较高

大规模作业执行稳定性

一般

较好

学习资料

较少

丰富

如何切换引擎

在 las.data 中切换引擎非常简单:

import las.data as ld
from las.data import set_engine_pandas, set_engine_ray, set_engine_spark

# 使用 pandas 引擎,默认
set_engine_pandas()
df = ld.reader("csv").load("tos://path/to/csv/")

# 使用 ray 引擎
set_engine_ray(ray_address="ray://localhost:10001")
df = ld.reader("csv").load("tos://path/to/csv/")

# 使用 spark 引擎
# 设定使用 Spark 做为计算引擎
spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate()
set_engine_spark()
df = ld.reader("csv").load("tos://path/to/csv/")
spark.stop()

引擎兼容

LAS-SDK 是基于 Spark/Ray 开发的。它的 dataset 可以无缝转换为 Spark 的 dataframe 或者 ray.data 的 dataset。比如

import las.data as ld
from las.data import set_engine_pandas, set_engine_ray, set_engine_spark

# 使用 spark 引擎,默认
spark = SparkSession.builder.appName("las-sdk-test").master("local").getOrCreate()
set_engine_spark()
df = ld.reader("csv").load("tos://path/to/csv/")

# 将 las dataset 转为 spark dataframe
spark_df = df.to_spark_df()

spark.stop()

转换之后,即可使用 spark dataframe 的能力来处理数据。

注意

在使用 spark 引擎的情况下,las dataset 只能转为 spark dataframe,无法转为 ray.data dataset。

数据源

LAS-SDK 支持丰富的数据源。

CSV

las.data.load_csv(paths="tos://path/to/csv")

参数
- paths: csv 文件所在地址,可以是本地目录或者 tos 目录

dataset.save_csv(path="tos://path/to/target")

参数
- paths: 目标地址,可以是本地目录或者 tos 目录

JSONL

las.data.load_json(paths="tos://path/to/jsonl")

参数
- paths: jsonl 文件所在地址,可以是本地目录或者 tos 目录

dataset.save_json(path="tos://path/to/target")

参数
- paths: 目标地址,可以是本地目录或者 tos 目录

Parquet

las.data.load_parquet(paths="tos://path/to/parquet")

参数
- paths: parquet 文件所在地址,可以是本地目录或者 tos 目录

dataset.save_parquet(path="tos://path/to/target")

参数
- path: 目标地址,可以是本地目录或者 tos 目录

Iceberg

las.data.load_iceberg(paths="tos://path/to/iceberg")

参数
- paths: icberg 文件所在地址,可以是本地目录或者 tos 目录

dataset.save_iceberg(path="tos://path/to/target")

参数
- path: 目标地址,可以是本地目录或者 tos 目录

Lance

las.data.load_lance(paths="tos://path/to/lance")

参数
- paths: lance 文件所在地址,可以是本地目录或者 tos 目录

dataset.save_lance(path="tos://path/to/target")

参数
- path: 目标地址,可以是本地目录或者 tos 目录

Image

las.data.reader("image").load(paths="tos://path/to/image", read_as: str | list[str] | None = None)

参数
- paths: image 文件所在地址,可以是本地目录或者 tos 目录
- read_as: 可选 "path", "ndarray", "binary", 默认为 ["path", "ndarray"]

返回 schema

如果是 ["path", "ndarray"]
| path | image |
| string | np.ndarray |

如果是 ["path", "binary"]
| path | image |
| string | bytes |

Text

las.data.reader("text").load(paths="tos://path/to/text", read_as: str | list[str] | None = None, whole_text: bool = True)

参数
- paths: text 文件所在地址,可以是本地目录或者 tos 目录
- read_as: 可选 "path", "text", 默认为 ["path", "text"]
- whole_text: 是否将整个文件作为 dataset 的一行,默认为 True。否则,文件中的一行将作为 dataset 的一行

返回 schema

如果是 ["path", "text"]
| path | text |
| string | string |

Binary

las.data.reader("csv").load(paths="tos://path/to/binary", read_as: str | list[str] | None = None)

参数
- paths: 二进制文件所在地址,可以是本地目录或者 tos 目录
- read_as: 可选 "path", "binary", 默认为 ["path"]

返回 schema

如果是 ["path", "binary"]
| path | bytes |
| string | bytes |

LAS 数据集

las.data.load_las_dataset(dataset_id: str, api_client: OpenAPIClient | None = None)

参数
- dataset_id: 数据集id(注意不是数据集名称)
- api_client: OpenAPI 客户端。如果没有会自动创建

dataset.save_las_dataset(mode: str, dataset_id: str, dataset_name: str | None = None, tos_path: str | None = None, category: str | None = None, api_client: OpenAPIClient = None, **kwargs)

参数
- mode: 模式,支持 create, append
- dataset_id: 数据集id。如果是 append,这个值为已知数据集;如果是 create,则为定义的 id
- dataset_name: 数据集name,create 时必须
- tos_path: 用于存储新建数据集的 tos 路径
- category: 数据集类型,支持 COMMON/CUSTOM/OUTCOME_RECORD。分别对应 LAS 的普通数据集,用户自定义数据集,以及数据回流集(推理结果回流)
- api_client: OpenAPI 客户端。如果没有会自动创建

方舟数据集

las.data.writer("fangzhou").save(name="dataset_name")

说明

Huggingface 数据集暂不支持。

汇总

Ray

Spark

csv

json

parquet

lance

path 方式

las catalog 方式

iceberg

path 方式

las catalog 方式

las-dataset

dataset_id 方式

text

binary

image

video

audio

API 列表

  1. I/O API,见数据源
  2. to_pandas_df
def to_pandas_df(self) -> pd.DataFrame:
    """
    Convert the current dataset to a Pandas DataFrame.
    This method checks if the current dataset is a Pandas DataFrame and returns it.
    If the dataset is not a Pandas DataFrame, it raises a TypeError.

    Returns
    -------
    pd.DataFrame: The current dataset as a Pandas DataFrame.

    Raises
    ------
    TypeError: If the current dataset is not a Pandas DataFrame.
    """
  1. to_spark_df
def to_spark_df(self) -> ps.DataFrame:
    """
    Convert the current dataset to a Spark DataFrame.
    This method checks if the current dataset is a Spark DataFrame and returns it.
    If the dataset is not a Spark DataFrame, it raises a TypeError.
    
    Returns
    -------
    ps.DataFrame: The current dataset as a Spark DataFrame.

    Raises
    ------
    TypeError: If the current dataset is not a Spark DataFrame.
    """
  1. to_ray_ds
def to_ray_ds(self) -> rd.Dataset:
    """
    Convert the current dataset to a Ray Dataset.
    This method checks if the current dataset is a Ray Dataset and returns it.
    If the dataset is not a Ray Dataset, it raises a TypeError.

    Returns
    -------
    rd.Dataset: The current dataset as a Ray Dataset.

    Raises
    ------
    TypeError: If the current dataset is not a Ray Dataset.
    """
  1. select
def select(self, *cols, **kwargs) -> "Dataset":
    """
    Select one or more columns from the dataset, and return a new dataset.

    Parameters
    ----------
    cols : str | list[str]
        The selected columns.

    Returns
    -------
    Dataset: A new Dataset with the selected columns.
    """
  1. add_column
import pandas as pd

def add_column(
self, col: str, fn: Callable[[pd.DataFrame], pd.Series], col_type: str | None = None, **kwargs
) -> "Dataset":
    """
    Add a column in the dataset with the given expression.

    Parameters
    ----------
    col : str
        The column name.
    col_type : str
        The type of the column.
    fn : Callable[[dict[str, Any]], dict[str, Any]]
        The function that gives the new column value.

    Returns
    -------
    Dataset: A new Dataset.
  1. drop_columns
def drop_columns(self, *cols, **kwargs) -> "Dataset":
    """
    Drop columns from the dataset and return a new dataset.

    Parameters
    ----------
    cols : str | list[str]
        The columns to be removed.

    Returns
    -------
    Dataset: A new dataset without the dropped columns.
    """
  1. rename_columns
def rename_columns(self, cols: dict[str, str], **kwargs) -> "Dataset":
    """
    Rename the column name(s) with new columns name(s).

    Parameters
    ----------
    cols : dict[str, str]
        A dict mapped the old column name(s) to new name(s).

    Returns
    -------
    Dataset: A new dataset the renamed columns.
    """
  1. sort
def sort(self, cols: str | list[str], ascending: bool | list[bool] = False) -> "Dataset":
    """
    Sort the dataset by the given keys.

    Parameters
    ----------
    cols : str | list[str]
        The sort keys.
    ascending : bool | list[bool]
        The sort order.

    Returns
    -------
    Dataset: The sorted dataset.
    """
  1. shuffle
def shuffle(self, **kwargs) -> "Dataset":
    """
    Shuffle the dataset.

    Returns
    -------
    Dataset: The shuffled dataset.
    """
  1. map
def map(self, fn: Callable[[dict[str, Any]], dict[str, Any]], **kwargs: Any) -> "Dataset":
    """
    Apply a function to each element of the dataset.
    This method applies a provided function to each element of the dataset.
    The function should take an item from the dataset and return a transformed item.

    Parameters
    ----------
    fn : Callable[[dict[str, Any]], dict[str, Any]]
        The function to apply to each element of the dataset.
    **kwargs : Any
        Additional keyword arguments to pass to the function.

    Returns
    -------
    Dataset: A new Dataset with the function applied to each element.
    """
  1. map_batch
import pandas as pd

def map_batch(self, fn: Callable[[pd.DataFrame], pd.DataFrame], **kwargs: Any) -> "Dataset":
    """
    Apply a function to each batch of the dataset, where the function takes an iterator of
    items and returns an iterator of items.

    Parameters
    ----------
    fn : Callable[[Iterator[dict[str, Any]]], Iterator[dict[str, Any]]]
        The function to apply to each batch of the dataset.
    **kwargs : Any
        Additional keyword arguments to pass to the function.

    Returns
    -------
    Dataset: A new Dataset with the function applied to each batch.
    """
  1. filter
def filter(self, fn: Callable[[dict[str, Any]], bool], **kwargs: Any) -> "Dataset":
    """
    Filter the dataset using a provided function.
    
    Parameters
    ----------
    fn : Callable[[dict[str, Any]], bool]
        A function that takes an item from the dataset and returns True if the item
        should be included in the filtered dataset, and False otherwise.
    **kwargs : Any
        Additional keyword arguments to pass to the function.
    
    Returns
    -------
    Dataset: A new Dataset containing only the items for which the function returned True.
    """
  1. flat_map
def flat_map(self, fn: Callable[[dict[str, Any]], Iterable[dict[str, Any]]], **kwargs: Any) -> "Dataset":
    """
    Apply a function that returns a list to each element of the dataset.

    Parameters
    ----------
    fn : Callable[[dict[str, Any]], list[dict[str, Any]]]
        The function to apply to each element of the dataset.
    **kwargs : Any
        Additional keyword arguments to pass to the function.

    Returns
    -------
    Dataset: A new Dataset with the function applied to each element.
    """
  1. schema
def schema(self) -> Any:
    """
    Get the schema of the current dataset.
    This method returns the schema of the current dataset, depending on its type.

    Returns
    -------
    Any:
        The schema of the current dataset.
    """
  1. split
def split(self, weights: list[float], seed: int | None = None) -> list["Dataset"]:
    """
    Split the dataset into multi-parts.

    Parameters
    ----------
    weights : list[float]
        The proportion of each part of the dataset.
    seed : int | None
        The seed to generate ramdom number for split the dataset randomly.

    Returns
    -------
    list["Dataset"]: a list which contains two datasets:
        The first represents the train set while the second represents the test set.
    """
  1. show
def show(self, limit: int = 20) -> None:
    """
    Show the dataset.

    Parameters
    ----------
    limit : int
        The number of rows to be showed, with a default value of 20.
    """

算子开发

支持的算子类型

LAS-SDK 目前支持四类 function 类算子:

map

from las.data.function.version import v1
from las.data.registry import function
from las.data.types import Category, Field, MapFunction

OP_ID = "fix_unicode"


# @function 注册:可选
@function(name="Unicode错误修复", identifier=OP_ID, category=Category.TEXT, version=v1)
class FixUnicode(MapFunction):
    """修复文本中的Unicode错误"""

    # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。
    input_col_name: str = Field(
        title="输入列名",
        description="输入内容所在的列名",
    )
    normalization: str = Field(
        default="NFC",
        title="unicode规范化形式",
        description="unicode编码的规范化形式,支持的形式有NFC、NFKC、NFD、NFKD",
        enum_values=["NFC", "NFKC", "NFD", "NFKD"],
    )

    # 初始化函数
    def __init__(self, **kwargs) -> None:
        """Initialization."""
        super().__init__(**kwargs)

        if self.normalization and len(self.normalization) > 0:
            self.normalization = self.normalization.upper()
        else:
            self.normalization = "NFC"

        if self.normalization.upper() not in ["NFC", "NFKC", "NFD", "NFKD"]:
            msg = (
                f"Normalization mode [{self.normalization}] is not "
                "supported. Can only be one of "
                '["NFC", "NFKC", "NFD", "NFKD"]'
            )
            raise ValueError(msg)

    # map 函数:输入一行,输出一行
    def map(self, row: dict[str, Any], **kwargs) -> dict[str, Any]:
        row[self.input_col_name] = ftfy.fix_text(
            row[self.input_col_name], normalization=self.normalization
        )
        return row

map_batch

map_batch 的作用同 map,只是使用了一种批的方式来完成任务,从而更加高效。

import pandas as pd

from las.data.function.version import v1
from las.data.registry import function
from las.data.types import Category, Field, BatchMapFunction

OP_ID = "fix_unicode_batch"


# @function 注册:可选
@function(name="Unicode错误修复", identifier=OP_ID, category=Category.TEXT, version=v1)
class FixUnicodeBatch(BatchMapFunction):
    """修复文本中的Unicode错误"""

    # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。
    input_col_name: str = Field(
        title="输入列名",
        description="输入内容所在的列名",
    )
    normalization: str = Field(
        default="NFC",
        title="unicode规范化形式",
        description="unicode编码的规范化形式,支持的形式有NFC、NFKC、NFD、NFKD",
        enum_values=["NFC", "NFKC", "NFD", "NFKD"],
    )

    # 初始化函数
    def __init__(self, **kwargs) -> None:
        """Initialization."""
        super().__init__(**kwargs)

        if self.normalization and len(self.normalization) > 0:
            self.normalization = self.normalization.upper()
        else:
            self.normalization = "NFC"

        if self.normalization.upper() not in ["NFC", "NFKC", "NFD", "NFKD"]:
            msg = (
                f"Normalization mode [{self.normalization}] is not "
                "supported. Can only be one of "
                '["NFC", "NFKC", "NFD", "NFKD"]'
            )
            raise ValueError(msg)

    # map 函数:输入一个 batch,输出一个 batch。每个 batch 的类型为 pandas dataframe。
    def map_batch(self, rows: pd.DataFrame, **kwargs) -> pd.DataFrame:
        rows[self.input_col_name] = rows.apply(
            lambda row: fyfy.fix_text(row[self.input_col_name), normalization),
            axis=1
        )
        return rows

flat_map

@function(identifier="text_split", name="文本切分", category=Category.DEFAULT, version="v1")
class TestFlatMap(FlatMapFunction):
    """Flat map that doubles the row."""

    # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。
    input_col_name: str = Field(
        title="输入列名",
        description="输入内容所在的列名",
    )
    chunk_size: int = Field(
        default=100,
        title="文本片段长度",
        description="被切分的文本按照文本片段长度进行切分",
    )
    keep_original: bool = Field(
        title="保留原字段",
        description="如果要保留原字段,则切分后的字段为 input_col_name + '_chunk'",
        default=True
    )
    
    def flat_map(self, row: dict[str, Any], **kwargs) -> list[dict[str, Any]]:
        if self.input_col_name not in row:
            return [row]
        
        text_to_split = str(row[self.input_col_name])
        text_length = len(text_to_split)
        num_chunks = (text_length + self.chunk_size - 1) // self.chunk_size
        
        result = []
        for i in range(num_chunks):
            start = i * self.chunk_size
            end = start + self.chunk_size
            chunk = text_to_split[start:end]
            
            new_row = row.copy()
            if self.keep_original:
                new_row[f"{self.split_field}_chunk"] = chunk
            else:
                new_row[self.input_col_name] = chunk
            result.append(new_row)
        
        return result

filter

from las.data.function.version import v1
from las.data.registry import function
from las.data.types import Category, Field, FilterFunction

OP_ID = "test_length_filter"


@function(name="文本长度过滤", identifier=OP_ID, category=Category.TEXT, version=v1)
class TextLengthFilter(FilterFunction):
    """过滤出文本长度在指定范围内的文本。"""

    # 可选参数定义方式:这里使用了类似于 PyDantic 的方式来维护变量的 meta 信息。
    input_col_name: str = Field(
        title="输入列名",
        description="输入内容所在的列名",
    )
    min_len: int = Field(
        default=10,
        title="文本长度下限",
        description="过滤掉文本长度低于该值的文本",
    )
    max_len: int = Field(
        default=sys.maxsize,
        title="文本长度上限",
        description="过滤掉文本长度高于该值的文本",
    )

    # 输入一行,输出一个 bool 值
    def filter(self, row: dict[str, Any], **kwargs) -> bool:
        """
        Parameters
        ----------
        row: dict[str, Any]
            Row to filter.

        Returns
        -------
        bool
            True if the row should be keep.
        """
        text_len = len(row[self.input_col_name])
        return self.min_len <= text_len <= self.max_len

说明

deduplicate暂不支持。

算子使用

算子使用有两种方式:

  1. 代码方式
  2. Yaml 配置方式

代码方式

from las.data as ld
from las.data import set_engine_ray

# 设定使用 ray 引擎
set_engine_ray()

df = ld.reader("csv").load("tos://path/to/csv/")
df = df.map(FixUnicode(input_col_name="content", nomalization="NFC")
df.writer("csv").save("tos://path/to/csv/processed/")

Yaml 配置方式

Yaml 配置协议定义:

api_version: "v1"
name: "test_job"
metadata:
    engine:
        name: "spark"        # 执行引擎
        options:             # 设定执行引擎的初始化参数
            op_1: "1"  
            op_2: "2"
    environment:
        en_1: "1"
        en_2: "2"
pipeline:
    - datasource:            # 数据源
        path: "tos://path/to/csv/"
        ds_name: "csv"       # 数据源名称(格式)
    - functions:             # 算子列表
        - func_1:            # 算子名称
            param_1: "p1"    # 算子参数,用于作为构造函数的初始化参数
            param_2: "3.0"
            runtime_cfgs:    # 运行时配置,比如使用多少 gpu 运行该算子
                num_cpus: "1"
                num_gpus: "2"
        - func_2:
            param_1: "p1"
            param_2: "3.0"
            runtime_cfgs:
                cfg_1: "1"
                cfg_2: "2"
    - datasink:               # 数据目标
        path: "tos://path/to/sink"
        ds_name: "parquet"    # 数据目标名称(格式)
        ds_config:            # 数据源选项,一般为数据库连接等。大多数情况下留空
            cf_1: "1"
            cf_2: "2"
        options:              # 写数据选项
            op_1: "/data/output"

对应上边代码方式的 Yaml 协议为:

api_version: "v1"
name: "test_job"
metadata:
    engine:
        name: "ray"
pipeline:
    - datasource:
        path: "tos://path/to/csv/"
        ds_name: "csv"
    - functions:
        - fix_unicode:
            input_col_name: "content"
            nomalization: "NFC"
    - datasink:
        path: "tos://path/to/csv/processed/"
        ds_name: "csv"

执行:

python -m pipe_runner -f /path/to/yaml_file.yaml

元数据 SDK for Java 使用参考

前提条件

已创建访问密钥并开通 LAS 服务

安装

根据需求选择 lf-client-2 或者 lf-client-3 进行接入。

lf-client-2

说明

兼容开源 hive2协议。

添加依赖

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>lf-client-2</artifactId>
    <version>1.0.1-RELEASE</version>
</dependency>

请求示例

String ak = {ak};
String sk = {sk};
HiveConf conf = new HiveConf();
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, {endPoint});
HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
hmsClient.setRegion("cn-beijing");
hmsClient.setAccessKeyIdAndSecretAccessKey(ak, sk);
List<String> allDatabases = hmsClient.getAllDatabases();
System.out.println(allDatabases);
Table table = hmsClient.getTable("db_test","tbl_test");
System.out.println(table);

lf-client-3

说明

兼容开源 hive3协议。

添加依赖

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>lf-client-3</artifactId>
    <version>1.0.1-RELEASE</version>
</dependency>

请求示例

public static void main(String[] args) throws Exception {
    String ak = {ak};
    String sk = {sk};
    Configuration conf = new Configuration();
    conf.set(MetastoreConf.ConfVars.THRIFT_URIS.getVarname(), {endPoint});
    HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(conf);
    hmsClient.setRegion("cn-beijing");
    hmsClient.setAccessKeyIdAndSecretAccessKey(ak, sk);
    Database database = hmsClient.getDatabase("xyu_test");
    Table table = hmsClient.getTable("db_test","tbl_test");
    System.out.println(database);
    System.out.println(table);
}

版本发布

hive2.x sdk

lf-client-2

hive-exec

发布日期

发布内容

1.0.1-RELEASE

1.0.0-LF2-RELEASE

20240521

1.0.0-RELEASE

1.0.0-LF2-RELEASE

20240311

hive3.x sdk

lf-client-3

hive-exec

发布日期

发布内容

1.0.1-RELEASE

1.0.0-LF3-RELEASE

20240521

1.0.0-RELEASE

1.0.0-LF3-RELEASE

20240311

连接地址

region

endpoint

备注

cn-beijing

thrift://lakeformation.las.cn-beijing.ivolces.com:48869

仅支持火山内部访问,不支持公网访问

cn-shanghai

thrift://lakeformation.las.cn-shanghai.ivolces.com:48869

cn-guangzhou

thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869

权限 SDK for Java 使用参考

前提条件

已创建访问密钥并开通LAS服务。

安装

添加依赖

<dependency>
  <groupId>bytedance.olap</groupId>
  <artifactId>gemini-client-shaded</artifactId>
  <version>1.0.0.3-RELEASE</version>
</dependency>

请求示例

public static void main(String[] args) throws Exception {
    String ak = {ak};
    String sk = {sk};
    String endPoint = {endPoint};
    GeminiClientIface signingClient = GeminiClientFactory.createSigningClient(endpoint, ak, sk, null, null, null);
    List<GeminiPrivilege> privileges = new ArrayList<>();
    GeminiPrivilege privilege = new GeminiPrivilege();
    GeminiResource geminiResource = new GeminiResource();
    geminiResource.setResourceScope(GeminiResourceScope.TABLE);
    geminiResource.setRegion("cn-beijing");
    geminiResource.setTenant("2100000001");
    geminiResource.setSchemaName("@hive#db_test");
    geminiResource.setTableName("tb_test");
    privilege.setResource(geminiResource);
    privilege.setAction(GeminiPrivilegeAction.DESCRIBE);
    privileges.add(privilege);
    GeminiPrincipal principal = new GeminiPrincipal();
    principal.setTenant("2100000001");
    principal.setPrincipalType(GeminiPrincipalType.ACCOUNT);
    principal.setPrincipalName("1008632");
    signingClient.checkPrivilegesForPolicy(principal,privileges,null,null);
}

版本发布

发布版本

发布日期

发布内容

1.0.0.18-SNAPSHOT

20240417

增加revoke_privileges_for_dropped_resources方法

1.0.0.3-RELEASE

20240514

去除log相关依赖,对部分第三方依赖做 relocate;CatalogName默认设置成hive,修复grant和revoke没有带catalogName的问题,以及优化gemini-client的打包方式

连接地址

region

endpoint

备注

cn-beijing

thrift://100.96.5.173:48869

仅支持火山内部访问,不支持公网访问,需要开白

cn-shanghai

thrift://100.96.4.175:48869

仅支持火山内部访问,不支持公网访问,需要开白

cn-guangzhou

thrift://100.96.4.70:48869

仅支持火山内部访问,不支持公网访问,需要开白