You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Scrapy爬虫导出PDF元数据至CSV时数据丢失问题排查求助

解决Scrapy爬虫中CSV仅写入部分PDF数据的问题

看起来你的核心问题是异步并发场景下的CSV文件写入冲突——Scrapy是多线程异步框架,多个save_pdf回调会同时触发add_to_csv操作,多个线程同时读写同一个CSV文件时会出现竞争条件,导致部分数据被覆盖或丢失。下面我会拆解问题原因,并给出两种可行的解决方案,优先推荐符合Scrapy最佳实践的方案。

问题根源分析

  1. 并发写入竞争:你的add_to_csv每次都打开、写入、关闭CSV文件,在高并发下,多个线程同时操作同一个文件时,追加写入的动作不是原子性的,后写入的线程可能会覆盖前一个线程的写入内容,或者导致数据错乱。
  2. CSV初始化竞态save_pdf中检查CSV是否存在的逻辑,可能会被多个线程同时触发,导致重复创建文件(甚至抛出文件已存在的异常),进而丢失部分写入请求。

方案一:使用Scrapy Item Pipeline(推荐)

Scrapy的Item Pipeline是专门用于处理爬取结果的组件,默认是单线程处理Item,天然避免并发写入问题,也更符合框架的设计模式。

步骤1:定义PDF数据Item

在项目的items.py中添加:

import scrapy

class PdfItem(scrapy.Item):
    location = scrapy.Field()       # PDF地址
    title = scrapy.Field()          # 标题
    author = scrapy.Field()         # 作者
    page_count = scrapy.Field()     # 页数
    is_tagged = scrapy.Field()      # 是否标记
    field_count = scrapy.Field()    # 字段数
    image_count = scrapy.Field()    # 图片数量

步骤2:修改爬虫代码,改用yield Item

把原来直接调用add_to_csv的逻辑,替换成生成PdfItem

# 替换save_pdf中self.add_to_csv(row)的部分
item = PdfItem()
item['location'] = url
item['title'] = metaData[0]
item['author'] = metaData[1]
item['page_count'] = metaData[2]
item['is_tagged'] = is_tagged
item['field_count'] = fieldCount
item['image_count'] = imageCount
yield item

同时删除爬虫中的create_csvadd_to_csv方法,以及save_pdf中关于CSV初始化的逻辑。

步骤3:编写CSV Pipeline

在项目的pipelines.py中添加:

import csv
import os

class PdfCsvPipeline:
    def __init__(self, base_path, domain):
        self.base_path = base_path
        self.domain = domain
        self.csv_file = None
        self.writer = None

    @classmethod
    def from_crawler(cls, crawler):
        # 从爬虫实例中获取配置参数
        return cls(
            base_path=crawler.spider.base_path,
            domain=crawler.spider.domain
        )

    def open_spider(self, spider):
        # 爬虫启动时初始化CSV文件和表头
        save_dir = os.path.join(self.base_path, self.domain)
        os.makedirs(save_dir, exist_ok=True)
        csv_path = os.path.join(save_dir, f"{self.domain}.csv")
        self.csv_file = open(csv_path, 'w', newline='', encoding='utf-8')
        self.writer = csv.writer(self.csv_file)
        self.writer.writerow([
            'location', 'title', 'author', '# of pages', 
            'tagged?', 'field count', 'image count'
        ])

    def process_item(self, item, spider):
        # 单线程写入数据,无并发冲突
        row = [
            item['location'],
            item['title'],
            item['author'],
            item['page_count'],
            item['is_tagged'],
            item['field_count'],
            item['image_count']
        ]
        self.writer.writerow(row)
        return item

    def close_spider(self, spider):
        # 爬虫结束时安全关闭文件
        if self.csv_file:
            self.csv_file.close()

步骤4:启用Pipeline

settings.py中配置启用这个Pipeline:

ITEM_PIPELINES = {
    'your_project_name.pipelines.PdfCsvPipeline': 300,  # 替换成你的项目名
}

方案二:给CSV写入加线程锁

如果不想改动架构,可以通过线程锁强制同一时间只有一个线程操作CSV文件,避免竞争。

修改爬虫代码

在爬虫类中添加锁,并修改相关方法:

import threading  # 导入线程锁模块

class PdfspiderSpider(CrawlSpider):
    name = 'pdfspider'
    allowed_domain = input('Enter the domain name...: ')
    allowed_domains = [allowed_domain]
    domain = allowed_domains[0]
    start = input('Enter the url...: ')
    start_urls = [start]
    base_path = input('Where to save...: ')
    csv_lock = threading.Lock()  # 添加类级别的线程锁

    rules = (
        Rule(LinkExtractor(), callback='parse_item', follow=True),
    )

    # 其他方法保持不变,修改create_csv、save_pdf、add_to_csv:
    def create_csv(self):
        header = ['location', 'title', 'author', '# of pages', 'tagged?', 'field count', 'image count']
        filename = os.path.join(self.base_path, self.domain, f"{self.domain}.csv")
        with self.csv_lock:
            # 二次检查文件是否存在,避免锁等待期间被其他线程创建
            if not os.path.exists(filename):
                with open(filename, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.writer(f)
                    writer.writerow(header)

    def save_pdf(self, response):
        url=response.url
        if response.status == 200:
            save_dir = os.path.join(self.base_path, self.domain)
            os.makedirs(save_dir, exist_ok=True)
            csvPath = os.path.join(save_dir, f"{self.domain}.csv")
            
            with self.csv_lock:
                if not os.path.exists(csvPath):
                    self.create_csv()
            
            # 下载PDF的代码保持不变...
            
            row = [url, metaData[0], metaData[1], metaData[2], is_tagged, fieldCount, imageCount]
            self.add_to_csv(row)
        else:
            print(f"Failed to load pdf: {url}")

    def add_to_csv(self,row):
        filename = os.path.join(self.base_path, self.domain, f"{self.domain}.csv")
        with self.csv_lock:  # 写入前加锁
            with open(filename, 'a', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(row)

两种方案都能解决你的问题,方案一更符合Scrapy的设计理念,扩展性更强;方案二改动更小,适合快速修复。

内容的提问来源于stack exchange,提问作者Mat

火山引擎 最新活动