在Django中下载Zip文件、解析CSV文件并将数据导入本地SQLite数据库
实现从ZIP URL下载CSV并导入Django SQLite数据库
我来帮你一步步搞定这个需求,全程用Django生态和Python标准库实现,兼顾效率和容错性,操作起来也很顺手。
一、前期准备:定义对应数据库模型
首先得在你的Django app里定义和CSV列匹配的模型,假设你的CSV包含name、email、age这三列,模型可以这么写:
# your_app/models.py from django.db import models class Person(models.Model): name = models.CharField(max_length=100) email = models.EmailField(unique=True) age = models.IntegerField() def __str__(self): return self.name
写完后别忘了执行数据库迁移:
python manage.py makemigrations python manage.py migrate
二、创建自定义Django管理命令
在Django里跑这种批量导入脚本,最方便的方式是写自定义管理命令——既能直接用python manage.py调用,还能自动复用Django的数据库连接和环境配置。
1. 新建命令文件
在你的app下创建management/commands目录(没有就新建),然后创建import_csv_from_zip_url.py文件:
# your_app/management/commands/import_csv_from_zip_url.py import csv import tempfile import urllib.request import os from zipfile import ZipFile, BadZipFile from django.core.management.base import BaseCommand, CommandError from your_app.models import Person class Command(BaseCommand): help = '下载ZIP文件、提取CSV并将数据导入SQLite数据库' def add_arguments(self, parser): # 接收用户传入的ZIP URL参数 parser.add_argument('zip_url', type=str, help='包含CSV的ZIP文件地址') # 可选参数:指定ZIP里的CSV文件名(如果有多个文件的话) parser.add_argument('--csv-filename', type=str, default='data.csv', help='ZIP内的CSV文件名') def handle(self, *args, **options): zip_url = options['zip_url'] csv_filename = options['csv_filename'] # 1. 下载ZIP到临时文件 self.stdout.write(self.style.SUCCESS(f"开始从 {zip_url} 下载文件")) try: with tempfile.NamedTemporaryFile(delete=False) as temp_zip: urllib.request.urlretrieve(zip_url, temp_zip.name) temp_zip_path = temp_zip.name except Exception as e: raise CommandError(f"ZIP文件下载失败:{str(e)}") # 2. 解压ZIP并读取CSV persons_to_create = [] try: with ZipFile(temp_zip_path, 'r') as zip_ref: # 检查ZIP里是否存在指定的CSV if csv_filename not in zip_ref.namelist(): raise CommandError(f"ZIP包中未找到 {csv_filename} 文件") # 读取并解析CSV内容 with zip_ref.open(csv_filename) as csv_file: # 处理编码(如果你的CSV是GBK编码,就换成'gbk') csv_content = csv_file.read().decode('utf-8-sig').splitlines() csv_reader = csv.DictReader(csv_content) # 3. 验证数据并准备批量导入对象 for row_num, row in enumerate(csv_reader, start=2): # 行号从2开始,跳过表头 try: person = Person( name=row['name'].strip(), email=row['email'].strip(), age=int(row['age'].strip()) ) persons_to_create.append(person) except KeyError as e: self.stdout.write(self.style.WARNING(f"第 {row_num} 行:缺失列 {str(e)} - 跳过该行")) except ValueError as e: self.stdout.write(self.style.WARNING(f"第 {row_num} 行:数据格式错误 - {str(e)} - 跳过该行")) except BadZipFile: raise CommandError("下载的文件不是有效的ZIP压缩包") except Exception as e: raise CommandError(f"处理ZIP/CSV文件失败:{str(e)}") finally: # 清理临时ZIP文件 os.unlink(temp_zip_path) # 4. 批量导入数据库(比逐条save()效率高很多) if persons_to_create: Person.objects.bulk_create(persons_to_create, batch_size=1000) self.stdout.write(self.style.SUCCESS(f"成功导入 {len(persons_to_create)} 条数据")) else: self.stdout.write(self.style.WARNING("未找到有效数据,无需导入"))
2. 运行命令
在终端执行以下命令即可完成导入:
python manage.py import_csv_from_zip_url "https://example.com/your_data.zip" --csv-filename="target_csv.csv"
如果ZIP里只有一个CSV文件,也可以省略--csv-filename参数,默认使用data.csv。
三、关键细节说明
- 临时文件处理:用
tempfile创建临时ZIP文件,用完自动删除,不会占用永久磁盘空间。 - 批量导入优化:用
bulk_create代替逐条save(),数据量越大,效率提升越明显,batch_size可以根据你的内存情况调整。 - 容错机制:捕获了下载失败、ZIP损坏、CSV列缺失、数据类型错误等多种异常,还会打印警告跳过无效行,不会因为一条坏数据导致整个导入中断。
- 编码兼容:用
utf-8-sig解码,能处理带BOM的UTF-8文件(比如Windows生成的CSV),如果你的CSV是其他编码,直接替换即可。
四、扩展优化建议
- 若需要定时自动导入,可以结合
django-crontab或Celery实现定时任务。 - 可以加入数据去重逻辑,比如先检查
email是否已存在,避免重复导入。 - 对于超大CSV文件,可以分块读取,避免一次性加载到内存导致溢出。
内容的提问来源于stack exchange,提问作者Aditya Mahajan




