You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何将Scrapy爬虫与数据清洗结合直接输出CSV文件?

当然可以!直接在Scrapy里衔接爬取+数据清洗+CSV导出

完全能跳过“存JSON→Jupyter清洗”的中间步骤,直接让Scrapy爬虫输出已经清洗好的CSV文件。下面给你两种实用的实现方案,你可以根据自己的清洗逻辑复杂度来选:


方案1:用Item Pipeline实现(推荐复杂清洗场景)

Scrapy的Item Pipeline本来就是用来处理爬取到的数据的,你可以把Jupyter里的清洗逻辑直接搬到Pipeline里,同时在Pipeline里完成CSV的写入。

步骤1:编写自定义Pipeline

在你的Scrapy项目的pipelines.py里添加以下代码(替换成你实际的字段和清洗逻辑):

import csv
from scrapy.exceptions import DropItem

class CleanedCsvExportPipeline:
    def open_spider(self, spider):
        # 初始化CSV文件,指定字段名(和你的Item字段对应)
        self.output_file = open('cleaned_result.csv', 'w', newline='', encoding='utf-8')
        self.csv_writer = csv.DictWriter(
            self.output_file,
            fieldnames=['title', 'price', 'category', 'publish_date']  # 替换成你的实际字段
        )
        self.csv_writer.writeheader()

    def process_item(self, item, spider):
        # 这里放你的数据清洗逻辑,对应Jupyter里的代码
        # 示例清洗操作:
        # 1. 丢弃缺失核心字段的Item
        if not item.get('title'):
            raise DropItem(f"跳过缺失标题的Item: {item}")
        
        # 2. 清理字符串格式(去除多余空格、特殊字符)
        item['title'] = item['title'].strip().replace('\n', '')
        item['category'] = item['category'].strip()
        
        # 3. 转换数据类型(比如价格字符串转浮点数)
        if item.get('price'):
            item['price'] = float(item['price'].replace('¥', '').replace(',', ''))
        
        # 4. 格式化日期(如果需要)
        # item['publish_date'] = datetime.datetime.strptime(item['publish_date'], '%Y-%m-%d').date()

        # 清洗完成后写入CSV
        self.csv_writer.writerow(dict(item))
        return item

    def close_spider(self, spider):
        self.output_file.close()

步骤2:启用Pipeline

在项目的settings.py里找到ITEM_PIPELINES配置,启用刚才写的Pipeline:

ITEM_PIPELINES = {
    # 替换成你的项目名,比如myproject.pipelines...
    'your_project_name.pipelines.CleanedCsvExportPipeline': 300,
}

方案2:自定义Feed Exporter(适合简单清洗场景)

如果你的清洗逻辑只是简单的字段格式化,也可以通过自定义Scrapy的Feed导出器来实现,直接在导出CSV前完成数据清洗。

步骤1:编写自定义导出器

在项目里新建一个exporters.py文件,添加以下代码:

from scrapy.exporters import CsvItemExporter

class CleanedCsvExporter(CsvItemExporter):
    def export_item(self, item):
        # 先对Item做清洗
        if item.get('title'):
            item['title'] = item['title'].strip()
        if item.get('price'):
            item['price'] = float(item['price'].replace('$', ''))
        # 更多清洗逻辑按需添加
        
        # 调用父类方法导出清洗后的Item
        super().export_item(item)

步骤2:配置Feed导出

settings.py里配置使用自定义的导出器,并指定输出路径:

# 注册自定义CSV导出器
FEED_EXPORTERS = {
    'csv': 'your_project_name.exporters.CleanedCsvExporter',
}

# 配置导出参数
FEED_URI = 'cleaned_data.csv'
FEED_FORMAT = 'csv'
FEED_EXPORT_ENCODING = 'utf-8'
FEED_EXPORT_FIELDS = ['title', 'price', 'category']  # 指定要导出的字段

注意事项

  • 把你Jupyter里的清洗代码转换成处理Scrapy Item的逻辑就行,Item本质是字典,操作起来和DataFrame的列处理逻辑类似。
  • 如果需要处理大量数据,Pipeline方案的内存占用会更友好,因为它是逐条处理写入;Feed Exporter是批量处理,适合数据量不大的场景。

内容的提问来源于stack exchange,提问作者Yukino Kondo

火山引擎 最新活动