You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Python多线程/多进程批量处理文本文件并生成对象列表?

多线程/多进程提速文件处理方案

当然可以通过多线程或者多进程解决你现在串行处理速度慢的问题!你的场景里,文件读写属于IO密集型操作,多线程能很好利用等待IO的时间处理其他文件;如果后续解析任务是CPU密集型(比如复杂字符串处理、正则匹配、数据计算等),那多进程会更合适——因为Python的GIL(全局解释器锁)会限制多线程在CPU密集任务下的效率,而多进程可以绕开这个限制。

下面我给你两个实用的实现版本,你可以根据实际需求选择:

一、多线程版本(适合IO密集型为主的任务)

这个版本用concurrent.futures.ThreadPoolExecutor并行创建fileObject实例,充分利用IO等待时间:

import os
import sys
from concurrent.futures import ThreadPoolExecutor

root = "C:/Users/jmartini/Desktop/Temp/text_files"

class fileObject(object):
    def __init__(self, filepath):
        self._content = ''
        self._filepath = ''
        self.filepath = filepath

    @property
    def filepath(self):
        return self._filepath

    @filepath.setter
    def filepath(self, value):
        self._filepath = value
        if os.path.isfile(value):
            self._content = ''
            with open(value, 'r') as f:
                self._content = f.read()
            # 在这里添加你的解析逻辑,比如字符串清洗、提取关键信息等
            # 示例:self._content = self._content.strip().upper()

    @property
    def content(self):
        return self._content

def get_filepaths(directory):
    # 先批量收集所有符合条件的文件路径,避免多线程中重复遍历目录
    filepaths = []
    for root_dir, subdirs, files in os.walk(directory):
        for filename in files:
            if os.path.splitext(filename)[-1] == '.sbs':
                filepaths.append(os.path.join(root_dir, filename))
    return filepaths

def create_fileobj(filepath):
    # 单独封装创建对象的逻辑,方便提交给线程池
    try:
        return fileObject(filepath)
    except Exception as e:
        print(f"处理文件 {filepath} 时出错: {str(e)}")
        return None

def collect_files(directory):
    filepaths = get_filepaths(directory)
    # max_workers可根据机器情况调整,IO密集型任务可设为20-50
    with ThreadPoolExecutor(max_workers=30) as executor:
        # 批量提交任务并收集结果,过滤处理失败的对象
        objs = [obj for obj in executor.map(create_fileobj, filepaths) if obj is not None]
    return objs

if __name__ == "__main__":
    objs = collect_files(root)
    print(f"成功处理 {len(objs)} 个文件")
    # 验证第一个文件内容(可选)
    # if objs:
    #     print(objs[0].content[:100])

多线程版本说明:

  • 先收集所有目标文件路径,避免多线程中重复遍历目录,减少额外开销。
  • 添加异常处理,单个文件处理失败不会导致整个任务崩溃。
  • max_workers值可灵活调整,IO密集型任务设大一些(比如30、50)也不会过度占用CPU。

二、多进程版本(适合CPU密集型为主的任务)

如果解析任务需要大量CPU计算,多进程能充分利用多核CPU优势,绕开GIL限制:

import os
import sys
from concurrent.futures import ProcessPoolExecutor

root = "C:/Users/jmartini/Desktop/Temp/text_files"

class fileObject(object):
    def __init__(self, filepath):
        self._content = ''
        self._filepath = ''
        self.filepath = filepath

    @property
    def filepath(self):
        return self._filepath

    @filepath.setter
    def filepath(self, value):
        self._filepath = value
        if os.path.isfile(value):
            self._content = ''
            with open(value, 'r') as f:
                self._content = f.read()
            # 在这里添加CPU密集型解析逻辑,比如复杂正则、数据转换等
            # 示例:self._content = ''.join(reversed(self._content))

    @property
    def content(self):
        return self._content

def get_filepaths(directory):
    filepaths = []
    for root_dir, subdirs, files in os.walk(directory):
        for filename in files:
            if os.path.splitext(filename)[-1] == '.sbs':
                filepaths.append(os.path.join(root_dir, filename))
    return filepaths

def create_fileobj(filepath):
    try:
        return fileObject(filepath)
    except Exception as e:
        print(f"处理文件 {filepath} 时出错: {str(e)}")
        return None

def collect_files(directory):
    filepaths = get_filepaths(directory)
    # max_workers默认是CPU核心数,也可手动指定
    with ProcessPoolExecutor() as executor:
        objs = [obj for obj in executor.map(create_fileobj, filepaths) if obj is not None]
    return objs

if __name__ == "__main__":
    objs = collect_files(root)
    print(f"成功处理 {len(objs)} 个文件")
    # 验证内容(可选)
    # if objs:
    #     print(objs[0].content[:100])

多进程版本说明:

  • 每个进程拥有独立的Python解释器和内存空间,不受GIL限制,适合CPU密集任务。
  • 多进程启动开销比多线程大,所以如果任务以IO为主,多线程更高效。
  • 确保fileObject类可序列化(pickle兼容),如果解析后内容是字符串、数字等基本类型,完全没问题;如果有复杂自定义对象,需要调整结构。

额外建议

  • 如果文件数量极其庞大(十万级以上),可以分批次提交任务,避免一次性占用过多内存。
  • 可根据实际测试结果调整max_workers值,找到最优的速度和资源占用平衡点。

内容的提问来源于stack exchange,提问作者JokerMartini

火山引擎 最新活动