You need to enable JavaScript to run this app.
导航
Daft 分布式UDF参数配置
最近更新时间:2025.10.24 20:07:14首次发布时间:2025.10.24 20:07:14
复制全文
我的收藏
有用
有用
无用
无用

在AI数据处理场景中,多模态数据(如文本、图像、点云、音频等)的分析与操作已成为核心需求,而 Daft 作为轻量且高效的分布式多模态数据处理框架,用户自定义函数(UDF)正其是支撑这类复杂操作的关键能力。在 Daft 的生态中,借助UDF实现多模态数据的定制化处理(例如图像特征提取、文本格式清洗、跨模态数据关联等),不仅是行业内的普遍实践,更因框架的低门槛设计而具备极高的易用性 —— 开发者无需深入理解其底层分布式底层逻辑,仅需通过简洁的代码封装业务逻辑,即可快速将自定义数据处理操作集成到 Daft 的数据流水线中,大幅降低了多模态数据处理的技术门槛。
Daft为满足不同场景下的开发习惯与需求,提供了多样化的使用方式,包括Batch UDF,Row-wise UDF, Async UDF等,同时支持单机调试与分布式部署的无缝切换。
需要注意的是,Daft UDF 的高效运行并非 “开箱即用”,其性能表现高度依赖 业务适配的参数配置—— 尤其是batch_size(批处理大小)与concurrency(并发数)两个核心参数。这两个参数的设置需建立在对 Daft 运行机制的理解之上:batch_size决定了单次 UDF 调用处理的最大数据量,过大可能导致内存溢出,过小则会增加函数调用开销;concurrency则控制并行执行的 UDF 任务数,过高可能引发资源争抢(如 CPU、GPU 或网络带宽瓶颈),过低则无法充分利用分布式资源。若参数设置与业务数据量(如单条数据大小、总数据量)、硬件资源不匹配,轻则导致任务并发效率下降,重则引发任务阻塞或崩溃。因此,开发者在使用 Daft UDF 时,需结合具体业务场景(如多模态数据的类型、处理逻辑的复杂度)与资源配置,合理调试参数,才能最大化发挥 Daft 的分布式优势。

UDF 样例

Daft支持以单机模式或者分布式模式运行UDF,在调度机制上,有些细微的差别,以分布式方式运行UDF需要依赖Ray的分布式调度能力,接下来,我们通过一个例子介绍我们通过Ray分布式运行一个Daft Batch UDF时,应该从哪些角度来设置UDF的参数。

说明

当前Daft版本为v0.6.4。

import logging
import time
import os


def configure_logging():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                        datefmt='%Y-%m-%d %H:%M:%S.%s'.format())

    logging.getLogger("tracing.span").setLevel(logging.WARNING)
    logging.getLogger("daft_io.stats").setLevel(logging.WARNING)
    logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING)
    logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING)
    logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING)
    logging.getLogger("MyBatchUdf").setLevel(logging.WARNING)


configure_logging()

import daft
from daft import col, Series


@daft.udf(return_dtype=daft.DataType.string())
class MyBatchUdf:
    def __init__(self, sleep: int = 0):
        self._id = id(self)
        self.counter = 0
        self.sleep = sleep
        self.logger = logging.getLogger(f"MyBatchUdf-{self._id}")

    def __call__(self, a: Series) -> Series:
        self.counter += len(a)

        self.logger.info(f"Actor got {len(a)} records, total: {self.counter}")

        if self.sleep > 0:
            time.sleep(self.sleep)

        return a


if __name__ == "__main__":
    if os.getenv("DAFT_RUNNER", "native") == "ray":
        import ray

        ray.init(runtime_env={"worker_process_setup_hook": configure_logging})

    partition = 2
    udf_concurrency = 4
    udf_batch_size = 200
    sleep_seconds = 2

    paths = [f"file-{i}" for i in range(1000)]
    df = daft.from_pydict({"path": paths})

    # Repartition not supported on the NativeRunner
    df = df.repartition(partition, "path")

    udf = (MyBatchUdf
           .override_options(batch_size=udf_batch_size)
           .with_concurrency(udf_concurrency)
           .with_init_args(sleep=sleep_seconds))
    df = df.with_column("batch", udf(col("path")))
    df.collect()

在上面提供的例子中,我们实现了一个MyBatchUdf, 其包含了一个计数器,记录当前UDF处理的数据条数,另外该UDF接受一个String类型的列,并返回列本身。数据处理的主要流程为:

  • 创建一个包含1000条数据的DataFrame,包含一列数据,列名为path
  • 通过from_pydict得到的DataFrame只包含一个分区,所以我们根据path列对dataframe进行重分区,重分区后的dataframe包含2个partition。
  • 之后我们定义了一个concurrency=4,batch_size=200的UDF算子来处理这1000条数据,理论上每个并发会处理250条数据。

运行结果分析

我们先看看参数配置不合理时,会带来哪些问题?

场景1 :数据倾斜

当我们在ray单机模式下运行这个任务时,我们会得到以下日志。从日志中可以看到总共启动了4个UDFActor实例,但是他们处理的数据条数分别为[110, 200, 290,400],并不是我们期望中各自处理250条数据。另外我们可以看到有两个UDF实例被调用了两次,前后调用时间与配置的sleep时间一致,说明同一个UDF实例会串行的执行收到的多个batch数据。假如每条数据的执行时间一致,那么执行400条数据的UDF实例就成为了长尾,任务后期就只会有一个并发在处理数据。

(UDFActor pid=80511) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-5155349264 - INFO - Actor got 200 records, total: 200
(UDFActor pid=80510) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-4467594448 - INFO - Actor got 200 records, total: 200
(UDFActor pid=80509) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-5098474896 - INFO - Actor got 110 records, total: 110
(UDFActor pid=80512) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-4548224592 - INFO - Actor got 200 records, total: 200
(UDFActor pid=80510) 2025-09-28 16:40:50.1759048850 - MyBatchUdf-4467594448 - INFO - Actor got 90 records, total: 290
(UDFActor pid=80512) 2025-09-28 16:40:50.1759048850 - MyBatchUdf-4548224592 - INFO - Actor got 200 records, total: 400

这里出现的数据不均匀和执行repartition有关(后面介绍具体原因),那我们看看,如果不执行分区操作的结果如何?从以下日志我们可以看到,4个并发分别处理的数据为[200, 200, 200, 400],虽然有5个batch,所有某一个并发会处理两个batch,但看起来比上面的情况好一些。

(UDFActor pid=96995) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5049017488 - INFO - Actor got 200 records, total: 200
(UDFActor pid=96996) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5237938000 - INFO - Actor got 200 records, total: 200
(UDFActor pid=96993) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5112045840 - INFO - Actor got 200 records, total: 200
(UDFActor pid=96998) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5887815120 - INFO - Actor got 200 records, total: 200
(UDFActor pid=96998) 2025-09-28 16:55:08.1759049708 - MyBatchUdf-5887815120 - INFO - Actor got 200 records, total: 400

场景2:实际并发低于预期

但是如果任务是运行在拥有多个worker的ray cluster上呢?当将这个任务运行在拥有两个worker的ray cluster时,通过下面的日志可以看到,实际只有2个并发处理了数据,处理的条数为[400, 600]。处理数据的并发并没有达到预期。

(UDFActor pid=13825) 2025-09-28 17:08:52.1759050532 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 200
(UDFActor pid=13827) 2025-09-28 17:08:52.1759050532 - MyBatchUdf-5088798160 - INFO - Actor got 200 records, total: 200
(UDFActor pid=13825) 2025-09-28 17:08:54.1759050534 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 400
(UDFActor pid=13827) 2025-09-28 17:08:54.1759050534 - MyBatchUdf-5088798160 - INFO - Actor got 200 records, total: 400
(UDFActor pid=13825) 2025-09-28 17:08:56.1759050536 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 600

场景3:实际并发低于预期

除了分区数和worker数量不匹配会导致并发低于预期以外,batch_size设置不合理依然会面临并发打不上去的问题,假设我们将batch_size设置为500,从下面的日志,我们可以看到,即使运行在单个节点时,也只有两个并发处理了数据,处理的条数为[500, 500],处理数据的并发并没有达到预期。

(UDFActor pid=37087) 2025-09-28 17:29:33.1759051773 - MyBatchUdf-5215119504 - INFO - Actor got 500 records, total: 500
(UDFActor pid=37086) 2025-09-28 17:29:33.1759051773 - MyBatchUdf-4822819216 - INFO - Actor got 500 records, total: 500

问题分析

通过上面的例子,我们可以了解到,DataFrame的分区数、UDF的batch size,以及UDF的concurrency设置的不合理会导致出现数据倾斜,并发打不上去的问题。下面我们从原理层面阐述一下Daft的运行机制以及具体的原因。
当分布式运行一个Daft作业时,整个作业代码首先会被构建为逻辑执行计划(LogicalPlan),在执行df.collect()时,会进行物化(materialize)操作,整个逻辑计划会被内置的daft优化器进行优化,之后会被转换为分布式物理执行计划(DistributedPhysicalPlan), 最终交给DistributedPhysicalPlanRunner执行整个Plan。

DistributedPhysicalPlanRunner会将physical plan转换可执行的pipeline,plan中的每个节点对应一个PipelineNode, 最终这个pipeline会被拆分为一个或者多个SwordfishTask。一般情况下DataFrame有多少个分区就会生成多少个SwordfishTask,这些Task首先会根据对应的调度策略会被分发到RaySwordfishActor上(每个Wroker会启动一个RawSwordfishActor),当Task的调度策略为Spread时,SwordfishTask会根据Worker空闲状态被均匀的分配到多个RaySwordfishActor上。

RaySwordfishActor负责执行SwordfishTask,通过NativeExecutor执行task中的LocalPhysicalPlan。(到这里,Native Runner和Ray Runner执行的逻辑基本上一致,区别是分布式场景下,每个SwordfishTask只会包含一个partiiton的数据(psets的size为1),而Native Runner会包含所有partiiton的数据)。每个Swordfish Task包含了整个stage plan的内容,在执行UDF算子时,会根据batch_size将当前SwordfishTask中的数据进行切分,切分为一个一个的morsel/MicroPartition,然后再调度到当前worker上的UDFActors上(执行UDFActor.eval_input)

所以这里有四个关键参数的设置:

  • Worker的数量:每个worker会启动一个RaySwordfishActor,一个RaySwordfishActor可以“同时”运行多个SwordfishTask(异步执行LocalPhysicalPlan)。Plan runner将task调度到各个RaySwordfishActor时,会根据当前RaySwordfishActor的负载情况决定是否要分发task,未分发的task处于等待状态,需要等前面的task运行结束后才能分发,所以Worker的数量,以及worker的总CPU数决定的task执行时的最大并发数。
  • DataFrame的partition:​一般情况下每个partition对应一个SwordfishTask,所以SwordfishTask的个数,决定整个pipeline的最大并行度,如果SwordfishTask的个数小于RaySwordfishActor的数量,那么某些RaySwordfishActor会因为收不到task,导致这些RaySwordfishActor空转。当DataFrame的分区个数比较少时,可以通过repartition, into_partition拓展Dataframe的分区数,从而增加task的个数,也可以通过into_batches将一个分区切分为多个task,每个task包含原始分区中的部分数据,从而增加执行task的最大并发数。
  • UDF的concurrency: 这个值决定了UDF运行时的最大并行度,On Ray模式下会决定启动多少个UDFActor,当ray cluster的worker是同构机型时,一般情况下UDFActor会均匀的分布在每个worker上,当某个Worker上存在该UDF的UDFActor时(比如当UDF的concurrency小于Worker的数量时,有的Worker上不会启动该UDF的UDFActor),对应的RaySwordfishActor只会将morsel调度到当前worker上的UDFActor。所以当RaySwordfishActor切分的morsel个数小于当前worker上的UDFActor个数时,某些UDFActor会由于接收不到数据,导致UDFActor空转。
  • UDF的batch_size: 这个值决定了UDF单次调用传入的最大数据条数。当batch_size设置的比较大时,一方面需要更大的执行内存,另一方面会导致RaySwordfishActor切分的morsel个数较少,没法充分利用Worker上的各个UDFActor,当batch_size设置的比较少,会增加函数调用开销。所以需要结合实际业务情况和优化目标进行调整。

在场景1中,我们遇到了数据处理不均匀的场景,我们通过repartition将一个包含1000条数据分区重分区为2个分区时,分区后的数据并非完全均匀,这是因为我们使用path的的hash值作为repartition key,所以倾斜程度依赖hash值的分布情况。上述例子中,重分区之后,得到了两个分区,包含的数据条数分别为510和490,然后每个分区再被daft引擎按batch size=200进行切分为[200, 200, 110, 200, 200, 90]6个batch,然后再调度到4个并发上。可以看到各个分区的切分是独立的,不同分区的数据没法被拼接起来,所以每个分区的最后一个batch的数据会远小于其它batch的数据。
在场景2中,我们遇到了实际并发低于预期的场景,这是因为原始DataFrame只有一个分区,最终只会产生一个task,被调度到其中的一个worker上,而UDF的并发实例是在plan阶段就已经“均匀”的分散在各个Worker上,RaySwordfishActor在调度batch/morsel的时候,优先调度到本地的worker上,导致只有一个Worker上的UDFActor实例在处理数据。所以只有当task的个数大于等于worker的数量时,我们才能将并发打到极致。那有什么办法既可以增加task/partition的个数,又能平衡不同task/partition的数据量呢。当有类似需求时,可以考虑使用into_batches这个API,这个API会将partition的数据,按照执行大小切分为不同的task,然后分发到不同的RaySwordfishActor上,比如在单个Worker的Ray cluster执行df.into_batches(200).with_colum("batch", udf(col("path")))时,会得到如下结果:

(UDFActor pid=29906) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 200
(UDFActor pid=29907) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4952482832 - INFO - Actor got 200 records, total: 200
(UDFActor pid=29909) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4548368464 - INFO - Actor got 200 records, total: 200
(UDFActor pid=29906) 2025-09-28 19:15:06.1759058106 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 400
(UDFActor pid=29906) 2025-09-28 19:15:08.1759058108 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 600

通过上述日志,我们可以看到数据被均匀的切分为5个batch了,但是与期望中均匀的调度到4个并发上不一致,只有3个并发收到了数据,且某个并发实例处理了3个batch的数据(上述结果存在随机性,可能是某2个并发分别处理了2个batch),导致并发实例间处理数据的不均衡。造成这个现象的原因和场景3一致,虽然5个task的数据是均衡的,但是每个task经过RaySwordfishActor的切分后,就只生成了一个morse,然后被随机分配到某个UDFActor。因为task间的数据处理是独立的,极端情况下,这5个task生成的morse都可能会被调度到同一个UDFActor上。所以当我们期望最大并发时,因为保证某一个task的数据在切分后,能均匀的分散到当前worker上的每个UDFActor上,比如当我们将udf_batch_size设置为50时,保证每个task的数据会被切分为4个morsel,然后以round-roubin的策略均分到各个UDFActor上。

(UDFActor pid=43813) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 50
(UDFActor pid=43815) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 50
(UDFActor pid=43814) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 50
(UDFActor pid=43816) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 50
(UDFActor pid=43813) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 100
(UDFActor pid=43815) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 100
(UDFActor pid=43814) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 100
(UDFActor pid=43816) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 100
(UDFActor pid=43813) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 150
(UDFActor pid=43815) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 150
(UDFActor pid=43814) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 150
(UDFActor pid=43816) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 150
(UDFActor pid=43813) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 200
(UDFActor pid=43815) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 200
(UDFActor pid=43814) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 200
(UDFActor pid=43816) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 200
(UDFActor pid=43813) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 250
(UDFActor pid=43815) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 250
(UDFActor pid=43814) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 250
(UDFActor pid=43816) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 250

看起来into_batches和UDFbatch_size的配合既能保证各个task数据的均衡性,又能充分的发挥各个UDFActor的并发度。但into_batches不是万能的,比如单机单分区DataFrame的场景不适合使用into_batches,由于只有一个分区,而且只有一个Worker,所以既不存在数据不均匀,也不存在要分散到多个Worker的场景,就没必要通过into_batches增加task的个数,反而会增加函数调用的开销。
另外执行into_batches时,即使DataFrame或者分区的数据充足,生成的task包含的数据不一定都满足对应的batch size。比如当DataFrame存在两个分区,第一个分区包含180条数据,第二个分区包含220条数据,当按照batch_size=200进行切分时,最终会得到3个task,包含的数据量分别为180,200, 20。
into_batches除了将一个大的分区拆分为多个task之外,还具备将多个小的分区合并为大的task的能力,比如假设DataFrame存在16个分区,每个分区仅包含20条数据,当执行df.into_batches(200)时,最终会生成两个各自包含160条数据的task,而不是一个包含200条的task和一个包含120条数据的task。背后的原因是distributed plan runner会将into_batches前后的逻辑拆分为两个stage,先物化第一个stage的结果,然后对其结果进行“攒批”,当累计条数大于batch size的80%时,则创建一个新的task,所以上述例子中会生成两个分别包含160条记录的task。

实践总结

针对在不同的应用场景,我们可以通过调整UDF的concurrency, batch size参数来实现对应的目的。
比如当我们期望最大化UDF并发时,假设一共有M个Worker,

  • 首先要保证这M个Worker都会收到task,所以要求生成task的个数大于等于Worker的个数,默认情况下task的个数由DataFrame的分区个数决定,如果原始分区个数较小,可以通过repartition或者into_partition增加DataFrame的分区个数,或者通过into_batches增加生成的task个数。
  • 其次应该保证Worker收到的数据尽可能的均匀,包括task的个数,以及task内包含的数据量。当task内的数据比较均匀时,task的个数最好是Worker个数的整数倍,这样保证每个Worker收到的task数是一致的。当task内部的数据量不均衡时,可以通过repartition/into_partitioninto_batches均衡各个task的数据,前者适用于分区数据比较多的场景(分区数太小会导致分区间数据差异比较大,repartition的大致逻辑为:上游的每个区分将自己的数据拆成N份(Chunk),下游每个分区再将上游的1/N的数据(Chunk)合并起来。当N比较小时,每个Chunk的数据量就会比较大, 即使各分区间只相差一个Chunk,也会存在Chunk级别的数据量差距)。后者适用于小Batch的场景。
  • Task的数据量确定后,应该尽可能的保证每个Worker上UDF的实例数 * UDF的batch size恰好等于Task的数据量,既 udf_concurrency * udf_batch_size / M = task_num_rows,这样能保证task的数据被切分后,可以均匀的分发到多个并发上。

上面我们只讨论了如何通过调整DataFrame的分区数,UDF的并发数和batch size来实现最大化并发计算的目的。实际应用过程中,想要高效的运行UDF,还需要额外关注UDF运行过程中的负载情况,比如CPU、内存使用率等,如果UDF处理数据的过程中占用的内存比较大,我们应该调小UDF batch size,避免OOM问题。调小batch size可能会morsel的个数增多,UDFActor处理这些morsel数据时会形成背压,所以当需要最大化并发时,应该增加udf的并发度。当UDF处理数据涉及大量的IO操作,我们应该考虑在UDF内部使用异步操作+协程的方式提高CPU利用率,同时增加batch size,减少UDF的调用次数。增加batch size可能会导致Morsel的个数减少,如果低于某个Worker上UDF的实例个数,就会导致某些实例没法收到数据处于空闲状态。
针对合并小文件场景,或者上游零碎小分区场景,可以使用into_batches实现合并分区的目的,从而减少UDF实例调用次数。