You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何利用多线程/多进程提升Python爬虫的爬取速度?

如何用多线程/多进程优化Python爬虫速度?

首先得指出你写的多线程代码有几个核心问题,这也是为什么它没达到预期效果的原因:

  • 在循环里重复创建ThreadPoolExecutor,这不仅没利用多线程的复用优势,反而会因为频繁创建销毁线程池带来额外性能开销
  • executor.map返回的生成器直接appenddata里,最后你得到的不是实际爬取数据,而是一堆未执行的生成器对象
  • 循环逻辑混乱:既在循环里单独调用crawl(id),又用executor.map处理整个ID列表,相当于重复发起了爬取请求

下面我给你提供几种经过验证的优化方案,从多线程(最适合爬虫场景)、多进程到分块处理的实现方式,附带详细说明:


方案一:多线程优化(推荐,适合IO密集型爬虫)

爬虫的核心耗时是网络请求(IO等待),多线程能充分利用CPU空闲时间发起更多请求,是性价比最高的优化方式。

完整修正代码

import requests
import pandas as pd
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed

def crawl(id):
    try:
        # 模拟浏览器请求头,避免被网站识别为爬虫
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
        }
        url = 'https://www.china0001.com.cn/project/{0:06d}.html'.format(id)
        print(f"正在爬取: {url}")
        # 添加超时限制,避免请求无限卡住
        content = requests.get(url, headers=headers, timeout=10).text
        soup = BeautifulSoup(content, 'lxml')
        tbody = soup.find("table", attrs={"id":"mse_new"}).find("tbody", attrs={"class":"jg"})
        tr_list = tbody.find_all("tr")
        
        rows = []
        for tr in tr_list[1:]:
            rows.append([td.text.strip() for td in tr.findAll("td")])
        
        # 整理成字典格式
        result_dict = dict([map(str.strip, item.split(':')) for row in rows for item in row])
        return result_dict
    except AttributeError:
        print(f"ID {id} 页面结构异常,跳过")
        return False
    except requests.exceptions.RequestException as e:
        print(f"ID {id} 请求失败: {str(e)}")
        return False

if __name__ == "__main__":
    # 并发数建议从10-20开始测试,过大会触发网站反爬
    MAX_WORKERS = 15
    start_id = 699998
    end_id = 700050
    ids_to_crawl = list(range(start_id, end_id))
    
    collected_data = []
    
    # 一次性创建线程池,复用线程
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # 提交所有爬取任务,获取Future对象(用于追踪任务状态)
        future_to_id = {executor.submit(crawl, id): id for id in ids_to_crawl}
        
        # 按任务完成顺序处理结果
        for future in as_completed(future_to_id):
            target_id = future_to_id[future]
            try:
                result = future.result()
                if result:
                    collected_data.append(result)
            except Exception as e:
                print(f"ID {target_id} 处理出错: {str(e)}")
    
    # 保存结果到Excel
    if collected_data:
        df = pd.DataFrame(collected_data)
        df.to_excel('multi_thread_crawl_result.xlsx', index=False)
        print(f"爬取完成!共获取{len(collected_data)}条有效数据,已保存到文件")
    else:
        print("未获取到任何有效数据")

关键优化点说明

  1. 线程池复用:只创建一次ThreadPoolExecutor,避免重复创建线程的开销
  2. 异常增强处理:新增了网络请求异常捕获(超时、连接失败等),提升爬虫稳定性
  3. 请求头模拟:添加User-Agent伪装成浏览器,降低被反爬的概率
  4. 结果有序处理:用as_completed按任务完成顺序处理结果,不用等待所有任务结束

方案二:多进程优化(适合CPU+IO混合场景)

如果你的爬虫在爬取后需要做大量数据清洗/计算(CPU密集型操作),可以用多进程,但纯爬虫场景下多线程更高效(多进程开销更大)。

核心修改点

只需要把ThreadPoolExecutor替换为ProcessPoolExecutor,其余逻辑和多线程版本一致:

from concurrent.futures import ProcessPoolExecutor, as_completed

# 其余代码和多线程版本完全相同,仅修改线程池部分
if __name__ == "__main__":
    # 多进程建议设置为CPU核心数(比如8核就设为8)
    MAX_WORKERS = 8
    # ... 其余代码不变

方案三:分块处理(超大规模ID范围)

如果需要爬取的ID数量特别多(比如几十万条),可以把ID列表分成若干块,每块用一个线程/进程处理,避免一次性提交过多任务导致内存占用过高:

分块处理示例代码

def crawl_batch(id_batch):
    """批量处理一个ID块的爬取任务"""
    batch_result = []
    for id in id_batch:
        res = crawl(id)
        if res:
            batch_result.append(res)
    return batch_result

if __name__ == "__main__":
    MAX_WORKERS = 15
    start_id = 699998
    end_id = 700100
    ids_to_crawl = list(range(start_id, end_id))
    
    # 每10个ID分为一个块
    batch_size = 10
    id_batches = [ids_to_crawl[i:i+batch_size] for i in range(0, len(ids_to_crawl), batch_size)]
    
    collected_data = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(crawl_batch, batch) for batch in id_batches]
        
        for future in as_completed(futures):
            batch_data = future.result()
            collected_data.extend(batch_data)
    
    # 保存结果...

额外反爬建议

  1. 不要盲目增大并发数,建议从10开始测试,观察网站响应,避免IP被封禁
  2. 可以添加随机延迟(比如在crawl函数里加time.sleep(random.uniform(0.1, 0.5))),但多线程下不要让所有线程同时延迟
  3. 记录已爬取成功的ID,避免重复爬取(比如用文本文件或数据库存储)

内容的提问来源于stack exchange,提问作者ah bon

火山引擎 最新活动