在AI数据处理场景中,多模态数据(如文本、图像、点云、音频等)的分析与操作已成为核心需求,而 Daft 作为轻量且高效的分布式多模态数据处理框架,用户自定义函数(UDF)正其是支撑这类复杂操作的关键能力。在 Daft 的生态中,借助UDF实现多模态数据的定制化处理(例如图像特征提取、文本格式清洗、跨模态数据关联等),不仅是行业内的普遍实践,更因框架的低门槛设计而具备极高的易用性 —— 开发者无需深入理解其底层分布式底层逻辑,仅需通过简洁的代码封装业务逻辑,即可快速将自定义数据处理操作集成到 Daft 的数据流水线中,大幅降低了多模态数据处理的技术门槛。
Daft为满足不同场景下的开发习惯与需求,提供了多样化的使用方式,包括Batch UDF,Row-wise UDF, Async UDF等,同时支持单机调试与分布式部署的无缝切换。
需要注意的是,Daft UDF 的高效运行并非 “开箱即用”,其性能表现高度依赖 业务适配的参数配置—— 尤其是batch_size(批处理大小)与concurrency(并发数)两个核心参数。这两个参数的设置需建立在对 Daft 运行机制的理解之上:batch_size决定了单次 UDF 调用处理的最大数据量,过大可能导致内存溢出,过小则会增加函数调用开销;concurrency则控制并行执行的 UDF 任务数,过高可能引发资源争抢(如 CPU、GPU 或网络带宽瓶颈),过低则无法充分利用分布式资源。若参数设置与业务数据量(如单条数据大小、总数据量)、硬件资源不匹配,轻则导致任务并发效率下降,重则引发任务阻塞或崩溃。因此,开发者在使用 Daft UDF 时,需结合具体业务场景(如多模态数据的类型、处理逻辑的复杂度)与资源配置,合理调试参数,才能最大化发挥 Daft 的分布式优势。
Daft支持以单机模式或者分布式模式运行UDF,在调度机制上,有些细微的差别,以分布式方式运行UDF需要依赖Ray的分布式调度能力,接下来,我们通过一个例子介绍我们通过Ray分布式运行一个Daft Batch UDF时,应该从哪些角度来设置UDF的参数。
说明
当前Daft版本为v0.6.4。
import logging import time import os def configure_logging(): logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S.%s'.format()) logging.getLogger("tracing.span").setLevel(logging.WARNING) logging.getLogger("daft_io.stats").setLevel(logging.WARNING) logging.getLogger("DaftStatisticsManager").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaScheduler").setLevel(logging.WARNING) logging.getLogger("DaftFlotillaDispatcher").setLevel(logging.WARNING) logging.getLogger("MyBatchUdf").setLevel(logging.WARNING) configure_logging() import daft from daft import col, Series @daft.udf(return_dtype=daft.DataType.string()) class MyBatchUdf: def __init__(self, sleep: int = 0): self._id = id(self) self.counter = 0 self.sleep = sleep self.logger = logging.getLogger(f"MyBatchUdf-{self._id}") def __call__(self, a: Series) -> Series: self.counter += len(a) self.logger.info(f"Actor got {len(a)} records, total: {self.counter}") if self.sleep > 0: time.sleep(self.sleep) return a if __name__ == "__main__": if os.getenv("DAFT_RUNNER", "native") == "ray": import ray ray.init(runtime_env={"worker_process_setup_hook": configure_logging}) partition = 2 udf_concurrency = 4 udf_batch_size = 200 sleep_seconds = 2 paths = [f"file-{i}" for i in range(1000)] df = daft.from_pydict({"path": paths}) # Repartition not supported on the NativeRunner df = df.repartition(partition, "path") udf = (MyBatchUdf .override_options(batch_size=udf_batch_size) .with_concurrency(udf_concurrency) .with_init_args(sleep=sleep_seconds)) df = df.with_column("batch", udf(col("path"))) df.collect()
在上面提供的例子中,我们实现了一个MyBatchUdf, 其包含了一个计数器,记录当前UDF处理的数据条数,另外该UDF接受一个String类型的列,并返回列本身。数据处理的主要流程为:
path。from_pydict得到的DataFrame只包含一个分区,所以我们根据path列对dataframe进行重分区,重分区后的dataframe包含2个partition。我们先看看参数配置不合理时,会带来哪些问题?
当我们在ray单机模式下运行这个任务时,我们会得到以下日志。从日志中可以看到总共启动了4个UDFActor实例,但是他们处理的数据条数分别为[110, 200, 290,400],并不是我们期望中各自处理250条数据。另外我们可以看到有两个UDF实例被调用了两次,前后调用时间与配置的sleep时间一致,说明同一个UDF实例会串行的执行收到的多个batch数据。假如每条数据的执行时间一致,那么执行400条数据的UDF实例就成为了长尾,任务后期就只会有一个并发在处理数据。
(UDFActor pid=80511) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-5155349264 - INFO - Actor got 200 records, total: 200 (UDFActor pid=80510) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-4467594448 - INFO - Actor got 200 records, total: 200 (UDFActor pid=80509) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-5098474896 - INFO - Actor got 110 records, total: 110 (UDFActor pid=80512) 2025-09-28 16:40:48.1759048848 - MyBatchUdf-4548224592 - INFO - Actor got 200 records, total: 200 (UDFActor pid=80510) 2025-09-28 16:40:50.1759048850 - MyBatchUdf-4467594448 - INFO - Actor got 90 records, total: 290 (UDFActor pid=80512) 2025-09-28 16:40:50.1759048850 - MyBatchUdf-4548224592 - INFO - Actor got 200 records, total: 400
这里出现的数据不均匀和执行repartition有关(后面介绍具体原因),那我们看看,如果不执行分区操作的结果如何?从以下日志我们可以看到,4个并发分别处理的数据为[200, 200, 200, 400],虽然有5个batch,所有某一个并发会处理两个batch,但看起来比上面的情况好一些。
(UDFActor pid=96995) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5049017488 - INFO - Actor got 200 records, total: 200 (UDFActor pid=96996) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5237938000 - INFO - Actor got 200 records, total: 200 (UDFActor pid=96993) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5112045840 - INFO - Actor got 200 records, total: 200 (UDFActor pid=96998) 2025-09-28 16:55:06.1759049706 - MyBatchUdf-5887815120 - INFO - Actor got 200 records, total: 200 (UDFActor pid=96998) 2025-09-28 16:55:08.1759049708 - MyBatchUdf-5887815120 - INFO - Actor got 200 records, total: 400
但是如果任务是运行在拥有多个worker的ray cluster上呢?当将这个任务运行在拥有两个worker的ray cluster时,通过下面的日志可以看到,实际只有2个并发处理了数据,处理的条数为[400, 600]。处理数据的并发并没有达到预期。
(UDFActor pid=13825) 2025-09-28 17:08:52.1759050532 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 200 (UDFActor pid=13827) 2025-09-28 17:08:52.1759050532 - MyBatchUdf-5088798160 - INFO - Actor got 200 records, total: 200 (UDFActor pid=13825) 2025-09-28 17:08:54.1759050534 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 400 (UDFActor pid=13827) 2025-09-28 17:08:54.1759050534 - MyBatchUdf-5088798160 - INFO - Actor got 200 records, total: 400 (UDFActor pid=13825) 2025-09-28 17:08:56.1759050536 - MyBatchUdf-6049781264 - INFO - Actor got 200 records, total: 600
除了分区数和worker数量不匹配会导致并发低于预期以外,batch_size设置不合理依然会面临并发打不上去的问题,假设我们将batch_size设置为500,从下面的日志,我们可以看到,即使运行在单个节点时,也只有两个并发处理了数据,处理的条数为[500, 500],处理数据的并发并没有达到预期。
(UDFActor pid=37087) 2025-09-28 17:29:33.1759051773 - MyBatchUdf-5215119504 - INFO - Actor got 500 records, total: 500 (UDFActor pid=37086) 2025-09-28 17:29:33.1759051773 - MyBatchUdf-4822819216 - INFO - Actor got 500 records, total: 500
通过上面的例子,我们可以了解到,DataFrame的分区数、UDF的batch size,以及UDF的concurrency设置的不合理会导致出现数据倾斜,并发打不上去的问题。下面我们从原理层面阐述一下Daft的运行机制以及具体的原因。
当分布式运行一个Daft作业时,整个作业代码首先会被构建为逻辑执行计划(LogicalPlan),在执行df.collect()时,会进行物化(materialize)操作,整个逻辑计划会被内置的daft优化器进行优化,之后会被转换为分布式物理执行计划(DistributedPhysicalPlan), 最终交给DistributedPhysicalPlanRunner执行整个Plan。
DistributedPhysicalPlanRunner会将physical plan转换可执行的pipeline,plan中的每个节点对应一个PipelineNode, 最终这个pipeline会被拆分为一个或者多个SwordfishTask。一般情况下DataFrame有多少个分区就会生成多少个SwordfishTask,这些Task首先会根据对应的调度策略会被分发到RaySwordfishActor上(每个Wroker会启动一个RawSwordfishActor),当Task的调度策略为Spread时,SwordfishTask会根据Worker空闲状态被均匀的分配到多个RaySwordfishActor上。
RaySwordfishActor负责执行SwordfishTask,通过NativeExecutor执行task中的LocalPhysicalPlan。(到这里,Native Runner和Ray Runner执行的逻辑基本上一致,区别是分布式场景下,每个SwordfishTask只会包含一个partiiton的数据(psets的size为1),而Native Runner会包含所有partiiton的数据)。每个Swordfish Task包含了整个stage plan的内容,在执行UDF算子时,会根据batch_size将当前SwordfishTask中的数据进行切分,切分为一个一个的morsel/MicroPartition,然后再调度到当前worker上的UDFActors上(执行UDFActor.eval_input)
所以这里有四个关键参数的设置:
RaySwordfishActor,一个RaySwordfishActor可以“同时”运行多个SwordfishTask(异步执行LocalPhysicalPlan)。Plan runner将task调度到各个RaySwordfishActor时,会根据当前RaySwordfishActor的负载情况决定是否要分发task,未分发的task处于等待状态,需要等前面的task运行结束后才能分发,所以Worker的数量,以及worker的总CPU数决定的task执行时的最大并发数。SwordfishTask,所以SwordfishTask的个数,决定整个pipeline的最大并行度,如果SwordfishTask的个数小于RaySwordfishActor的数量,那么某些RaySwordfishActor会因为收不到task,导致这些RaySwordfishActor空转。当DataFrame的分区个数比较少时,可以通过repartition, into_partition拓展Dataframe的分区数,从而增加task的个数,也可以通过into_batches将一个分区切分为多个task,每个task包含原始分区中的部分数据,从而增加执行task的最大并发数。UDFActor,当ray cluster的worker是同构机型时,一般情况下UDFActor会均匀的分布在每个worker上,当某个Worker上存在该UDF的UDFActor时(比如当UDF的concurrency小于Worker的数量时,有的Worker上不会启动该UDF的UDFActor),对应的RaySwordfishActor只会将morsel调度到当前worker上的UDFActor。所以当RaySwordfishActor切分的morsel个数小于当前worker上的UDFActor个数时,某些UDFActor会由于接收不到数据,导致UDFActor空转。RaySwordfishActor切分的morsel个数较少,没法充分利用Worker上的各个UDFActor,当batch_size设置的比较少,会增加函数调用开销。所以需要结合实际业务情况和优化目标进行调整。在场景1中,我们遇到了数据处理不均匀的场景,我们通过repartition将一个包含1000条数据分区重分区为2个分区时,分区后的数据并非完全均匀,这是因为我们使用path的的hash值作为repartition key,所以倾斜程度依赖hash值的分布情况。上述例子中,重分区之后,得到了两个分区,包含的数据条数分别为510和490,然后每个分区再被daft引擎按batch size=200进行切分为[200, 200, 110, 200, 200, 90]6个batch,然后再调度到4个并发上。可以看到各个分区的切分是独立的,不同分区的数据没法被拼接起来,所以每个分区的最后一个batch的数据会远小于其它batch的数据。
在场景2中,我们遇到了实际并发低于预期的场景,这是因为原始DataFrame只有一个分区,最终只会产生一个task,被调度到其中的一个worker上,而UDF的并发实例是在plan阶段就已经“均匀”的分散在各个Worker上,RaySwordfishActor在调度batch/morsel的时候,优先调度到本地的worker上,导致只有一个Worker上的UDFActor实例在处理数据。所以只有当task的个数大于等于worker的数量时,我们才能将并发打到极致。那有什么办法既可以增加task/partition的个数,又能平衡不同task/partition的数据量呢。当有类似需求时,可以考虑使用into_batches这个API,这个API会将partition的数据,按照执行大小切分为不同的task,然后分发到不同的RaySwordfishActor上,比如在单个Worker的Ray cluster执行df.into_batches(200).with_colum("batch", udf(col("path")))时,会得到如下结果:
(UDFActor pid=29906) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 200 (UDFActor pid=29907) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4952482832 - INFO - Actor got 200 records, total: 200 (UDFActor pid=29909) 2025-09-28 19:15:04.1759058104 - MyBatchUdf-4548368464 - INFO - Actor got 200 records, total: 200 (UDFActor pid=29906) 2025-09-28 19:15:06.1759058106 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 400 (UDFActor pid=29906) 2025-09-28 19:15:08.1759058108 - MyBatchUdf-4816300688 - INFO - Actor got 200 records, total: 600
通过上述日志,我们可以看到数据被均匀的切分为5个batch了,但是与期望中均匀的调度到4个并发上不一致,只有3个并发收到了数据,且某个并发实例处理了3个batch的数据(上述结果存在随机性,可能是某2个并发分别处理了2个batch),导致并发实例间处理数据的不均衡。造成这个现象的原因和场景3一致,虽然5个task的数据是均衡的,但是每个task经过RaySwordfishActor的切分后,就只生成了一个morse,然后被随机分配到某个UDFActor。因为task间的数据处理是独立的,极端情况下,这5个task生成的morse都可能会被调度到同一个UDFActor上。所以当我们期望最大并发时,因为保证某一个task的数据在切分后,能均匀的分散到当前worker上的每个UDFActor上,比如当我们将udf_batch_size设置为50时,保证每个task的数据会被切分为4个morsel,然后以round-roubin的策略均分到各个UDFActor上。
(UDFActor pid=43813) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 50 (UDFActor pid=43815) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 50 (UDFActor pid=43814) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 50 (UDFActor pid=43816) 2025-09-28 19:27:04.1759058824 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 50 (UDFActor pid=43813) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 100 (UDFActor pid=43815) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 100 (UDFActor pid=43814) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 100 (UDFActor pid=43816) 2025-09-28 19:27:06.1759058826 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 100 (UDFActor pid=43813) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 150 (UDFActor pid=43815) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 150 (UDFActor pid=43814) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 150 (UDFActor pid=43816) 2025-09-28 19:27:08.1759058828 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 150 (UDFActor pid=43813) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 200 (UDFActor pid=43815) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 200 (UDFActor pid=43814) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 200 (UDFActor pid=43816) 2025-09-28 19:27:10.1759058830 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 200 (UDFActor pid=43813) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5061156240 - INFO - Actor got 50 records, total: 250 (UDFActor pid=43815) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5090240720 - INFO - Actor got 50 records, total: 250 (UDFActor pid=43814) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-5906907600 - INFO - Actor got 50 records, total: 250 (UDFActor pid=43816) 2025-09-28 19:27:12.1759058832 - MyBatchUdf-4546699600 - INFO - Actor got 50 records, total: 250
看起来into_batches和UDFbatch_size的配合既能保证各个task数据的均衡性,又能充分的发挥各个UDFActor的并发度。但into_batches不是万能的,比如单机单分区DataFrame的场景不适合使用into_batches,由于只有一个分区,而且只有一个Worker,所以既不存在数据不均匀,也不存在要分散到多个Worker的场景,就没必要通过into_batches增加task的个数,反而会增加函数调用的开销。
另外执行into_batches时,即使DataFrame或者分区的数据充足,生成的task包含的数据不一定都满足对应的batch size。比如当DataFrame存在两个分区,第一个分区包含180条数据,第二个分区包含220条数据,当按照batch_size=200进行切分时,最终会得到3个task,包含的数据量分别为180,200, 20。into_batches除了将一个大的分区拆分为多个task之外,还具备将多个小的分区合并为大的task的能力,比如假设DataFrame存在16个分区,每个分区仅包含20条数据,当执行df.into_batches(200)时,最终会生成两个各自包含160条数据的task,而不是一个包含200条的task和一个包含120条数据的task。背后的原因是distributed plan runner会将into_batches前后的逻辑拆分为两个stage,先物化第一个stage的结果,然后对其结果进行“攒批”,当累计条数大于batch size的80%时,则创建一个新的task,所以上述例子中会生成两个分别包含160条记录的task。
针对在不同的应用场景,我们可以通过调整UDF的concurrency, batch size参数来实现对应的目的。
比如当我们期望最大化UDF并发时,假设一共有M个Worker,
repartition或者into_partition增加DataFrame的分区个数,或者通过into_batches增加生成的task个数。repartition/into_partition、into_batches均衡各个task的数据,前者适用于分区数据比较多的场景(分区数太小会导致分区间数据差异比较大,repartition的大致逻辑为:上游的每个区分将自己的数据拆成N份(Chunk),下游每个分区再将上游的1/N的数据(Chunk)合并起来。当N比较小时,每个Chunk的数据量就会比较大, 即使各分区间只相差一个Chunk,也会存在Chunk级别的数据量差距)。后者适用于小Batch的场景。上面我们只讨论了如何通过调整DataFrame的分区数,UDF的并发数和batch size来实现最大化并发计算的目的。实际应用过程中,想要高效的运行UDF,还需要额外关注UDF运行过程中的负载情况,比如CPU、内存使用率等,如果UDF处理数据的过程中占用的内存比较大,我们应该调小UDF batch size,避免OOM问题。调小batch size可能会morsel的个数增多,UDFActor处理这些morsel数据时会形成背压,所以当需要最大化并发时,应该增加udf的并发度。当UDF处理数据涉及大量的IO操作,我们应该考虑在UDF内部使用异步操作+协程的方式提高CPU利用率,同时增加batch size,减少UDF的调用次数。增加batch size可能会导致Morsel的个数减少,如果低于某个Worker上UDF的实例个数,就会导致某些实例没法收到数据处于空闲状态。
针对合并小文件场景,或者上游零碎小分区场景,可以使用into_batches实现合并分区的目的,从而减少UDF实例调用次数。