Python实现视频帧提取:基于CSV校验跳过已处理视频
实现新增视频帧提取的自动校验逻辑(跳过已处理视频)
我来帮你搞定这个新增视频的校验和帧提取逻辑,咱们先理清楚需求和现有代码的问题,再一步步修改实现:
需求概述
你需要实现的核心逻辑是:监控视频文件夹,当有新增视频时,先检查该视频是否已经在记录已处理视频的CSV文件中——如果存在则跳过处理,不存在则执行帧提取,处理完成后把视频名称写入CSV标记为已处理。现有代码已经能完成帧提取和写入CSV,但缺少前置校验已处理视频的关键步骤。
现有代码
import glob import os import cv2 import csv def extractFrames(m): global vid_name vid_files=glob(m) for v_f in range(len(vid_files)): print("path of video========>>>>.",vid_files[v_f]) v1=os.path.basename(vid_files[v_f]) try: vid_name = os.path.splitext(v1)[0] vidcap = cv2.VideoCapture(vid_files[v_f]) except cv2.error as e: print(e) except: print('error') #condition fsize=os.stat(vid_files[v_f]) print('=============size of video ===================:' , fsize.st_size) try: if (fsize.st_size > 1000): fps = vidcap.get(cv2.CAP_PROP_FPS) frameCount = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frameCount/fps minutes = int(duration/60) print('fps = ' + str(fps)) print('number of frames = ' + str(frameCount)) print('duration (S) = ' + str(duration)) if (duration > 1): success,image = vidcap.read() count=0 success=True while success: img_name = vid_name + '_f' + str(count) + ".jpg" success,image = vidcap.read() if count % 10 == 0 or count ==0: target_non_target(img_name, image) count+=1 vidcap.release() cv2.destroyAllWindows() except: print("error") print('finished processing video ', vid_files[v_f]) with open("C:\multi_cat_3\models\research\object_detection\my_imgs"+'/video_info.csv', 'a') as csv_file: fieldnames = ['Video_Name','Process'] file_is_empty = os.stat("C:\multi_cat_3\models\research\object_detection\my_imgs"+'/video_info.csv').st_size == 0 writer = csv.DictWriter(csv_file, fieldnames=fieldnames) if file_is_empty: writer.writeheader() writer.writerow({'Video_Name':vid_name,'Process':'done'}) if __name__ == "__main__": x="C:\Python36\videos\*.mp4" extractFrames(x)
现有代码的不足
- 没有读取CSV文件中的已处理记录,每次运行都会重复处理所有视频
- 使用
global vid_name增加了代码耦合性,不够灵活 - 文件路径使用单反斜杠,会被Python解析为转义字符,容易出现路径错误
解决方案实现
步骤1:编写辅助函数读取已处理视频列表
先写一个函数读取CSV,返回已处理视频的集合(集合的查找效率比列表更高):
def get_processed_videos(csv_path): processed_videos = set() # 如果CSV文件不存在,直接返回空集合 if not os.path.exists(csv_path): return processed_videos # 读取CSV中的已处理视频名称 with open(csv_path, 'r', newline='', encoding='utf-8') as csv_file: reader = csv.DictReader(csv_file) for row in reader: processed_videos.add(row['Video_Name'].strip()) return processed_videos
步骤2:修改extractFrames函数,加入校验逻辑
整合校验逻辑,跳过已处理视频,同时优化路径和错误处理:
import glob import os import cv2 import csv def get_processed_videos(csv_path): processed_videos = set() if not os.path.exists(csv_path): return processed_videos with open(csv_path, 'r', newline='', encoding='utf-8') as csv_file: reader = csv.DictReader(csv_file) for row in reader: processed_videos.add(row['Video_Name'].strip()) return processed_videos def extractFrames(video_pattern, csv_path): # 获取已处理视频列表 processed = get_processed_videos(csv_path) vid_files = glob.glob(video_pattern) for vid_path in vid_files: print(f"Checking video: {vid_path}") # 提取不带扩展名的视频名称 vid_basename = os.path.basename(vid_path) vid_name = os.path.splitext(vid_basename)[0] # 核心校验:已处理则跳过 if vid_name in processed: print(f"Video {vid_name} already processed, skipping...") continue # 未处理则执行帧提取流程 try: vidcap = cv2.VideoCapture(vid_path) if not vidcap.isOpened(): print(f"Failed to open video {vid_path}") continue except cv2.error as e: print(f"OpenCV error for {vid_path}: {e}") continue except Exception as e: print(f"Unexpected error opening {vid_path}: {e}") continue # 视频大小校验 fsize = os.stat(vid_path) print(f"=============Size of video {vid_name}: {fsize.st_size} bytes") try: if fsize.st_size > 1000: fps = vidcap.get(cv2.CAP_PROP_FPS) frameCount = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frameCount / fps if fps > 0 else 0 print(f"fps = {fps}") print(f"Number of frames = {frameCount}") print(f"Duration (S) = {duration}") if duration > 1: success, image = vidcap.read() count = 0 while success: img_name = f"{vid_name}_f{count}.jpg" # 每10帧处理一次(包括第0帧) if count % 10 == 0: target_non_target(img_name, image) success, image = vidcap.read() count += 1 # 确保释放资源 vidcap.release() cv2.destroyAllWindows() print(f"Finished processing video {vid_path}") # 写入CSV标记为已处理 file_exists = os.path.exists(csv_path) with open(csv_path, 'a', newline='', encoding='utf-8') as csv_file: fieldnames = ['Video_Name', 'Process'] writer = csv.DictWriter(csv_file, fieldnames=fieldnames) # 空文件或新文件时写入表头 if not file_exists or os.stat(csv_path).st_size == 0: writer.writeheader() writer.writerow({'Video_Name': vid_name, 'Process': 'done'}) # 更新已处理集合,避免同一次运行中重复处理 processed.add(vid_name) except Exception as e: print(f"Error processing {vid_path}: {e}") vidcap.release() cv2.destroyAllWindows() if __name__ == "__main__": # 使用原始字符串避免转义字符问题 VIDEO_PATTERN = r"C:\Python36\videos\*.mp4" CSV_PATH = r"C:\multi_cat_3\models\research\object_detection\my_imgs\video_info.csv" extractFrames(VIDEO_PATTERN, CSV_PATH)
关键改进说明
- 路径安全:用原始字符串
r"..."避免转义字符解析错误 - 高效校验:用集合存储已处理视频,O(1)时间复杂度判断是否跳过
- 代码解耦:移除全局变量,改用局部变量传递视频名称
- 错误处理优化:增加视频打开状态检查,确保资源正常释放
- 可读性提升:用f-string替代字符串拼接,代码更直观
场景验证(新增V3视频)
当文件夹中已有V1、V2且已写入CSV时,新增V3后:
- 运行脚本,
get_processed_videos读取CSV得到{'V1', 'V2'} - 遍历到V3时,检查发现
V3 not in processed,执行帧提取流程 - 处理完成后,将V3写入CSV,下次运行时会自动跳过V3
内容的提问来源于stack exchange,提问作者Ankit




