You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Python实现视频帧提取:基于CSV校验跳过已处理视频

实现新增视频帧提取的自动校验逻辑(跳过已处理视频)

我来帮你搞定这个新增视频的校验和帧提取逻辑,咱们先理清楚需求和现有代码的问题,再一步步修改实现:

需求概述

你需要实现的核心逻辑是:监控视频文件夹,当有新增视频时,先检查该视频是否已经在记录已处理视频的CSV文件中——如果存在则跳过处理,不存在则执行帧提取,处理完成后把视频名称写入CSV标记为已处理。现有代码已经能完成帧提取和写入CSV,但缺少前置校验已处理视频的关键步骤。

现有代码

import glob
import os
import cv2
import csv

def extractFrames(m):
    global vid_name
    vid_files=glob(m)
    for v_f in range(len(vid_files)):
        print("path of video========>>>>.",vid_files[v_f])
        v1=os.path.basename(vid_files[v_f])
        try:
            vid_name = os.path.splitext(v1)[0]
            vidcap = cv2.VideoCapture(vid_files[v_f])
        except cv2.error as e:
            print(e)
        except:
            print('error')
        #condition
        fsize=os.stat(vid_files[v_f])
        print('=============size of video ===================:' , fsize.st_size)
        try:
            if (fsize.st_size > 1000):
                fps = vidcap.get(cv2.CAP_PROP_FPS)
                frameCount = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
                duration = frameCount/fps
                minutes = int(duration/60)
                print('fps = ' + str(fps))
                print('number of frames = ' + str(frameCount))
                print('duration (S) = ' + str(duration))
                if (duration > 1):
                    success,image = vidcap.read()
                    count=0
                    success=True
                    while success:
                        img_name = vid_name + '_f' + str(count) + ".jpg"
                        success,image = vidcap.read()
                        if count % 10 == 0 or count ==0:
                            target_non_target(img_name, image)
                        count+=1
                vidcap.release()
                cv2.destroyAllWindows()
        except:
            print("error")
        print('finished processing video ', vid_files[v_f])
        with open("C:\multi_cat_3\models\research\object_detection\my_imgs"+'/video_info.csv', 'a') as csv_file:
            fieldnames = ['Video_Name','Process']
            file_is_empty = os.stat("C:\multi_cat_3\models\research\object_detection\my_imgs"+'/video_info.csv').st_size == 0
            writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
            if file_is_empty:
                writer.writeheader()
            writer.writerow({'Video_Name':vid_name,'Process':'done'})

if __name__ == "__main__":
    x="C:\Python36\videos\*.mp4"
    extractFrames(x)

现有代码的不足

  • 没有读取CSV文件中的已处理记录,每次运行都会重复处理所有视频
  • 使用global vid_name增加了代码耦合性,不够灵活
  • 文件路径使用单反斜杠,会被Python解析为转义字符,容易出现路径错误

解决方案实现

步骤1:编写辅助函数读取已处理视频列表

先写一个函数读取CSV,返回已处理视频的集合(集合的查找效率比列表更高):

def get_processed_videos(csv_path):
    processed_videos = set()
    # 如果CSV文件不存在,直接返回空集合
    if not os.path.exists(csv_path):
        return processed_videos
    # 读取CSV中的已处理视频名称
    with open(csv_path, 'r', newline='', encoding='utf-8') as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            processed_videos.add(row['Video_Name'].strip())
    return processed_videos

步骤2:修改extractFrames函数,加入校验逻辑

整合校验逻辑,跳过已处理视频,同时优化路径和错误处理:

import glob
import os
import cv2
import csv

def get_processed_videos(csv_path):
    processed_videos = set()
    if not os.path.exists(csv_path):
        return processed_videos
    with open(csv_path, 'r', newline='', encoding='utf-8') as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            processed_videos.add(row['Video_Name'].strip())
    return processed_videos

def extractFrames(video_pattern, csv_path):
    # 获取已处理视频列表
    processed = get_processed_videos(csv_path)
    vid_files = glob.glob(video_pattern)
    
    for vid_path in vid_files:
        print(f"Checking video: {vid_path}")
        # 提取不带扩展名的视频名称
        vid_basename = os.path.basename(vid_path)
        vid_name = os.path.splitext(vid_basename)[0]
        
        # 核心校验:已处理则跳过
        if vid_name in processed:
            print(f"Video {vid_name} already processed, skipping...")
            continue
        
        # 未处理则执行帧提取流程
        try:
            vidcap = cv2.VideoCapture(vid_path)
            if not vidcap.isOpened():
                print(f"Failed to open video {vid_path}")
                continue
        except cv2.error as e:
            print(f"OpenCV error for {vid_path}: {e}")
            continue
        except Exception as e:
            print(f"Unexpected error opening {vid_path}: {e}")
            continue
        
        # 视频大小校验
        fsize = os.stat(vid_path)
        print(f"=============Size of video {vid_name}: {fsize.st_size} bytes")
        
        try:
            if fsize.st_size > 1000:
                fps = vidcap.get(cv2.CAP_PROP_FPS)
                frameCount = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
                duration = frameCount / fps if fps > 0 else 0
                
                print(f"fps = {fps}")
                print(f"Number of frames = {frameCount}")
                print(f"Duration (S) = {duration}")
                
                if duration > 1:
                    success, image = vidcap.read()
                    count = 0
                    while success:
                        img_name = f"{vid_name}_f{count}.jpg"
                        # 每10帧处理一次(包括第0帧)
                        if count % 10 == 0:
                            target_non_target(img_name, image)
                        success, image = vidcap.read()
                        count += 1
            # 确保释放资源
            vidcap.release()
            cv2.destroyAllWindows()
            print(f"Finished processing video {vid_path}")
            
            # 写入CSV标记为已处理
            file_exists = os.path.exists(csv_path)
            with open(csv_path, 'a', newline='', encoding='utf-8') as csv_file:
                fieldnames = ['Video_Name', 'Process']
                writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
                # 空文件或新文件时写入表头
                if not file_exists or os.stat(csv_path).st_size == 0:
                    writer.writeheader()
                writer.writerow({'Video_Name': vid_name, 'Process': 'done'})
            # 更新已处理集合,避免同一次运行中重复处理
            processed.add(vid_name)
            
        except Exception as e:
            print(f"Error processing {vid_path}: {e}")
            vidcap.release()
            cv2.destroyAllWindows()

if __name__ == "__main__":
    # 使用原始字符串避免转义字符问题
    VIDEO_PATTERN = r"C:\Python36\videos\*.mp4"
    CSV_PATH = r"C:\multi_cat_3\models\research\object_detection\my_imgs\video_info.csv"
    extractFrames(VIDEO_PATTERN, CSV_PATH)

关键改进说明

  • 路径安全:用原始字符串r"..."避免转义字符解析错误
  • 高效校验:用集合存储已处理视频,O(1)时间复杂度判断是否跳过
  • 代码解耦:移除全局变量,改用局部变量传递视频名称
  • 错误处理优化:增加视频打开状态检查,确保资源正常释放
  • 可读性提升:用f-string替代字符串拼接,代码更直观

场景验证(新增V3视频)

当文件夹中已有V1、V2且已写入CSV时,新增V3后:

  1. 运行脚本,get_processed_videos读取CSV得到{'V1', 'V2'}
  2. 遍历到V3时,检查发现V3 not in processed,执行帧提取流程
  3. 处理完成后,将V3写入CSV,下次运行时会自动跳过V3

内容的提问来源于stack exchange,提问作者Ankit

火山引擎 最新活动