如何利用多线程/多进程提升Python爬虫的爬取速度?
如何用多线程/多进程优化Python爬虫速度?
首先得指出你写的多线程代码有几个核心问题,这也是为什么它没达到预期效果的原因:
- 在循环里重复创建
ThreadPoolExecutor,这不仅没利用多线程的复用优势,反而会因为频繁创建销毁线程池带来额外性能开销 - 把
executor.map返回的生成器直接append到data里,最后你得到的不是实际爬取数据,而是一堆未执行的生成器对象 - 循环逻辑混乱:既在循环里单独调用
crawl(id),又用executor.map处理整个ID列表,相当于重复发起了爬取请求
下面我给你提供几种经过验证的优化方案,从多线程(最适合爬虫场景)、多进程到分块处理的实现方式,附带详细说明:
方案一:多线程优化(推荐,适合IO密集型爬虫)
爬虫的核心耗时是网络请求(IO等待),多线程能充分利用CPU空闲时间发起更多请求,是性价比最高的优化方式。
完整修正代码
import requests import pandas as pd from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed def crawl(id): try: # 模拟浏览器请求头,避免被网站识别为爬虫 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36" } url = 'https://www.china0001.com.cn/project/{0:06d}.html'.format(id) print(f"正在爬取: {url}") # 添加超时限制,避免请求无限卡住 content = requests.get(url, headers=headers, timeout=10).text soup = BeautifulSoup(content, 'lxml') tbody = soup.find("table", attrs={"id":"mse_new"}).find("tbody", attrs={"class":"jg"}) tr_list = tbody.find_all("tr") rows = [] for tr in tr_list[1:]: rows.append([td.text.strip() for td in tr.findAll("td")]) # 整理成字典格式 result_dict = dict([map(str.strip, item.split(':')) for row in rows for item in row]) return result_dict except AttributeError: print(f"ID {id} 页面结构异常,跳过") return False except requests.exceptions.RequestException as e: print(f"ID {id} 请求失败: {str(e)}") return False if __name__ == "__main__": # 并发数建议从10-20开始测试,过大会触发网站反爬 MAX_WORKERS = 15 start_id = 699998 end_id = 700050 ids_to_crawl = list(range(start_id, end_id)) collected_data = [] # 一次性创建线程池,复用线程 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 提交所有爬取任务,获取Future对象(用于追踪任务状态) future_to_id = {executor.submit(crawl, id): id for id in ids_to_crawl} # 按任务完成顺序处理结果 for future in as_completed(future_to_id): target_id = future_to_id[future] try: result = future.result() if result: collected_data.append(result) except Exception as e: print(f"ID {target_id} 处理出错: {str(e)}") # 保存结果到Excel if collected_data: df = pd.DataFrame(collected_data) df.to_excel('multi_thread_crawl_result.xlsx', index=False) print(f"爬取完成!共获取{len(collected_data)}条有效数据,已保存到文件") else: print("未获取到任何有效数据")
关键优化点说明
- 线程池复用:只创建一次
ThreadPoolExecutor,避免重复创建线程的开销 - 异常增强处理:新增了网络请求异常捕获(超时、连接失败等),提升爬虫稳定性
- 请求头模拟:添加
User-Agent伪装成浏览器,降低被反爬的概率 - 结果有序处理:用
as_completed按任务完成顺序处理结果,不用等待所有任务结束
方案二:多进程优化(适合CPU+IO混合场景)
如果你的爬虫在爬取后需要做大量数据清洗/计算(CPU密集型操作),可以用多进程,但纯爬虫场景下多线程更高效(多进程开销更大)。
核心修改点
只需要把ThreadPoolExecutor替换为ProcessPoolExecutor,其余逻辑和多线程版本一致:
from concurrent.futures import ProcessPoolExecutor, as_completed # 其余代码和多线程版本完全相同,仅修改线程池部分 if __name__ == "__main__": # 多进程建议设置为CPU核心数(比如8核就设为8) MAX_WORKERS = 8 # ... 其余代码不变
方案三:分块处理(超大规模ID范围)
如果需要爬取的ID数量特别多(比如几十万条),可以把ID列表分成若干块,每块用一个线程/进程处理,避免一次性提交过多任务导致内存占用过高:
分块处理示例代码
def crawl_batch(id_batch): """批量处理一个ID块的爬取任务""" batch_result = [] for id in id_batch: res = crawl(id) if res: batch_result.append(res) return batch_result if __name__ == "__main__": MAX_WORKERS = 15 start_id = 699998 end_id = 700100 ids_to_crawl = list(range(start_id, end_id)) # 每10个ID分为一个块 batch_size = 10 id_batches = [ids_to_crawl[i:i+batch_size] for i in range(0, len(ids_to_crawl), batch_size)] collected_data = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(crawl_batch, batch) for batch in id_batches] for future in as_completed(futures): batch_data = future.result() collected_data.extend(batch_data) # 保存结果...
额外反爬建议
- 不要盲目增大并发数,建议从10开始测试,观察网站响应,避免IP被封禁
- 可以添加随机延迟(比如在
crawl函数里加time.sleep(random.uniform(0.1, 0.5))),但多线程下不要让所有线程同时延迟 - 记录已爬取成功的ID,避免重复爬取(比如用文本文件或数据库存储)
内容的提问来源于stack exchange,提问作者ah bon




