当任务选择使用预约计划资源,建议在代码中填写读取MLP_CURRENT_CAPACITY_BLOCK_EXPIRATION_TIMESTAMP
环境变量的内容,感知资源过期信息,并自动保存 最近一轮的 checkpoint,避免训练产出因为资源释放而丢失。
# 选择了预约计划的资源时,建议您通过以下的步骤在资源释放时及时的保存 CheckPoint 。 # 步骤1,使用字节开源的异步存储 CheckPoint 能力,提高 CheckPoint 保存频率,以减少中断损失,您需要修改和补充以下代码: import torch # 通过下面命令安装 pip install bytecheckpoint import bytecheckpoint as bcp # 保存 CheckPoint def save_checkpoint(ckpt_path, model, optimizer): checkpoint_state = { "model": model, "optimizer": optimizer, "extra_state": {"torch_rng_state": torch.get_rng_state()}, } bcp.save(ckpt_path, checkpoint_state, framework="fsdp") # 该姿势保存的 ckpt 使用对应的方法加载 def load_checkpoint(ckpt_path, model, optimizer): checkpoint_state = {"model": model, "optimizer": optimizer, "extra_state": {}} bcp.load(ckpt_path, checkpoint_state, framework="fsdp") # 步骤2,通过平台注入的环境变量,预估资源到期剩余时间,及时在资源到期前保存 CheckPoint import os import time def should_save_ckpt(max_steps_duration: float, save_ckpt_duration: float, redundant_time: float = 0) -> bool: """ 判断是否应保存检查点(考虑剩余时间、step耗时、保存耗时和冗余时间) Args: max_steps_duration: 历史step最长耗时(秒) save_ckpt_duration: 保存ckpt耗时(秒) redundant_time: 防抖动冗余时间(秒,默认0) Returns: bool: 是否保存 """ # 获取并校验环境变量时间戳 exp_ts = os.getenv("MLP_CURRENT_CAPACITY_BLOCK_EXPIRATION_TIMESTAMP") if not exp_ts: return False # 无环境变量 try: remaining = float(exp_ts) - time.time() except ValueError: return False # 时间戳格式错误 # 关键判断:剩余时间需满足 保存+后续step+冗余 return remaining > 0 and max_steps_duration > 0 and remaining >= save_ckpt_duration + max_steps_duration + redundant_time
更多内容实践见:https://github.com/ByteDance-Seed/ByteCheckpoint?tab=readme-ov-file#-getting-started