Ray Dashboard 提供了基础集群监控功能,但在复杂分布式 AI 计算场景中仍有局限:
Flow Insight 视图为 Ray 应用的执行流程提供了强大的可视化功能,帮助用户理解分布式应用中 actors、tasks 和数据传输之间的关系。该视图提供了应用执行的逻辑视图、物理视图、分布式栈、火焰图、甘特图等五种可视化模式,便于分析性能、识别瓶颈并优化 Ray 工作负载。
本文将通过一个具体示例,指导您如何为 Serverless Ray 作业开启并使用 Flow Insight 功能进行可视化分析。
请将以下代码保存为 test_flowinsight.py 文件:
import ray import random import time import os @ray.remote class Worker: """ Create a tree of workers in which each worker spawns multiple children. """ def __init__(self, depth, is_bottleneck=False): # Whether this worker should mock delay self.is_bottleneck = is_bottleneck # Current depth in the worker tree (root = highest number) self.depth = depth # List to store child worker actor references self.children = [] # Create child workers if we're not at a leaf node (depth > 0) if self.depth > 0: # Create 1-2 children randomly for each worker for _ in range(random.randint(1, 2)): # Recursively create child workers with decreased depth # Randomly decide if the child will be a bottleneck self.children.append( Worker.remote( depth=self.depth - 1, is_bottleneck=random.randint(0, 1) == 1 ) ) def process(self): """ Process work and delegate to child workers. Bottleneck workers at leaf nodes introduce significant delays. """ # List to collect all async task references from children futures = [] # For each child worker for child in self.children: # Submit 10 process tasks to each child for _ in range(10): # If this is a bottleneck worker at a leaf node (or just above), # introduce a significant delay (30 seconds) if self.is_bottleneck and self.depth - 1 <= 0: time.sleep(30.0) # Standard processing time for all workers (5 seconds) time.sleep(5) # Submit the process task to the child worker # This creates a nested tree of task calls futures.append(child.process.remote()) # Wait for all child tasks to complete before returning # This ensures the full tree of tasks must complete ray.get(futures) # Create the root worker with depth 3 # This will create a tree of workers with maximum depth of 3 worker = Worker.remote(depth=3, is_bottleneck=False) # Start processing and wait for the entire tree to complete ray.get(worker.process.remote())
# 代码打包 zip test_flowinsight.zip test_flowinsight.py # 上传到tos路径,示例: tos://tos-snzhao/serverless/test_flowinsight.zip
参数名称 | 说明 |
|---|---|
镜像地址 | 所有 2.46.0.1 以上的基础 Ray 镜像,都支持 Flow Insight 功能。镜像列表详情参见:Ray 镜像列表 |
资源文件 | 前期准备中的资源文件所在的 TOS 路径 |
代码入口 | 代码执行文件,例:Python test_flowinsight.py |
资源参数 | Head CPU、Head Memory、Worker CPU、Worker Memory、Worker Replicas 等,以实际需求填写 |
高级配置 | 运行 ray 作业时的参数配置,如果要开启 Flow Insight 的功能,必须设置 serverless.ray.head.pod.env 和 serverless.ray.worker.pod.env 两个参数的值为 |
Ray 2.46.0.1 之后的版本将陆续支持分布式调试等功能。