You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

在Django中下载Zip文件、解析CSV文件并将数据导入本地SQLite数据库

实现从ZIP URL下载CSV并导入Django SQLite数据库

我来帮你一步步搞定这个需求,全程用Django生态和Python标准库实现,兼顾效率和容错性,操作起来也很顺手。

一、前期准备:定义对应数据库模型

首先得在你的Django app里定义和CSV列匹配的模型,假设你的CSV包含nameemailage这三列,模型可以这么写:

# your_app/models.py
from django.db import models

class Person(models.Model):
    name = models.CharField(max_length=100)
    email = models.EmailField(unique=True)
    age = models.IntegerField()

    def __str__(self):
        return self.name

写完后别忘了执行数据库迁移:

python manage.py makemigrations
python manage.py migrate

二、创建自定义Django管理命令

在Django里跑这种批量导入脚本,最方便的方式是写自定义管理命令——既能直接用python manage.py调用,还能自动复用Django的数据库连接和环境配置。

1. 新建命令文件

在你的app下创建management/commands目录(没有就新建),然后创建import_csv_from_zip_url.py文件:

# your_app/management/commands/import_csv_from_zip_url.py
import csv
import tempfile
import urllib.request
import os
from zipfile import ZipFile, BadZipFile
from django.core.management.base import BaseCommand, CommandError
from your_app.models import Person

class Command(BaseCommand):
    help = '下载ZIP文件、提取CSV并将数据导入SQLite数据库'

    def add_arguments(self, parser):
        # 接收用户传入的ZIP URL参数
        parser.add_argument('zip_url', type=str, help='包含CSV的ZIP文件地址')
        # 可选参数:指定ZIP里的CSV文件名(如果有多个文件的话)
        parser.add_argument('--csv-filename', type=str, default='data.csv', help='ZIP内的CSV文件名')

    def handle(self, *args, **options):
        zip_url = options['zip_url']
        csv_filename = options['csv_filename']

        # 1. 下载ZIP到临时文件
        self.stdout.write(self.style.SUCCESS(f"开始从 {zip_url} 下载文件"))
        try:
            with tempfile.NamedTemporaryFile(delete=False) as temp_zip:
                urllib.request.urlretrieve(zip_url, temp_zip.name)
                temp_zip_path = temp_zip.name
        except Exception as e:
            raise CommandError(f"ZIP文件下载失败:{str(e)}")

        # 2. 解压ZIP并读取CSV
        persons_to_create = []
        try:
            with ZipFile(temp_zip_path, 'r') as zip_ref:
                # 检查ZIP里是否存在指定的CSV
                if csv_filename not in zip_ref.namelist():
                    raise CommandError(f"ZIP包中未找到 {csv_filename} 文件")
                
                # 读取并解析CSV内容
                with zip_ref.open(csv_filename) as csv_file:
                    # 处理编码(如果你的CSV是GBK编码,就换成'gbk')
                    csv_content = csv_file.read().decode('utf-8-sig').splitlines()
                    csv_reader = csv.DictReader(csv_content)
                    
                    # 3. 验证数据并准备批量导入对象
                    for row_num, row in enumerate(csv_reader, start=2):  # 行号从2开始,跳过表头
                        try:
                            person = Person(
                                name=row['name'].strip(),
                                email=row['email'].strip(),
                                age=int(row['age'].strip())
                            )
                            persons_to_create.append(person)
                        except KeyError as e:
                            self.stdout.write(self.style.WARNING(f"第 {row_num} 行:缺失列 {str(e)} - 跳过该行"))
                        except ValueError as e:
                            self.stdout.write(self.style.WARNING(f"第 {row_num} 行:数据格式错误 - {str(e)} - 跳过该行"))

        except BadZipFile:
            raise CommandError("下载的文件不是有效的ZIP压缩包")
        except Exception as e:
            raise CommandError(f"处理ZIP/CSV文件失败:{str(e)}")
        finally:
            # 清理临时ZIP文件
            os.unlink(temp_zip_path)

        # 4. 批量导入数据库(比逐条save()效率高很多)
        if persons_to_create:
            Person.objects.bulk_create(persons_to_create, batch_size=1000)
            self.stdout.write(self.style.SUCCESS(f"成功导入 {len(persons_to_create)} 条数据"))
        else:
            self.stdout.write(self.style.WARNING("未找到有效数据,无需导入"))

2. 运行命令

在终端执行以下命令即可完成导入:

python manage.py import_csv_from_zip_url "https://example.com/your_data.zip" --csv-filename="target_csv.csv"

如果ZIP里只有一个CSV文件,也可以省略--csv-filename参数,默认使用data.csv

三、关键细节说明

  • 临时文件处理:用tempfile创建临时ZIP文件,用完自动删除,不会占用永久磁盘空间。
  • 批量导入优化:用bulk_create代替逐条save(),数据量越大,效率提升越明显,batch_size可以根据你的内存情况调整。
  • 容错机制:捕获了下载失败、ZIP损坏、CSV列缺失、数据类型错误等多种异常,还会打印警告跳过无效行,不会因为一条坏数据导致整个导入中断。
  • 编码兼容:用utf-8-sig解码,能处理带BOM的UTF-8文件(比如Windows生成的CSV),如果你的CSV是其他编码,直接替换即可。

四、扩展优化建议

  • 若需要定时自动导入,可以结合django-crontab或Celery实现定时任务。
  • 可以加入数据去重逻辑,比如先检查email是否已存在,避免重复导入。
  • 对于超大CSV文件,可以分块读取,避免一次性加载到内存导致溢出。

内容的提问来源于stack exchange,提问作者Aditya Mahajan

火山引擎 最新活动