如何优化Python多文件去重代码?处理100GB数据性能问题
Great question—handling deduplication for 100GB of text files in memory is a classic bottleneck scenario. Let’s start by breaking down why your current code is slowing down, then dive into practical optimizations and alternative approaches that can handle this scale efficiently.
First, the core issues with your existing code:
- The
main_listset grows indefinitely in memory, consuming gigabytes of RAM. This leads to frequent garbage collection, memory paging to disk, and overall slowdowns. - Frequent synchronous IO operations (saving
main_listand processed paths after every file) block the processing pipeline and add unnecessary overhead. - Maintaining two in-memory sets (
main_listandemails) amplifies the memory pressure.
1. Use a Bloom Filter for Memory-Efficient Probabilistic Deduplication
If you can tolerate an extremely low false positive rate (e.g., 0.01%), a Bloom Filter is a game-changer. It’s a space-efficient probabilistic data structure that can check for existence without storing the actual elements.
Example Implementation
from pybloom_live import BloomFilter import os # Configure based on your estimated total number of unique entries estimated_entries = 1_000_000_000 false_positive_rate = 0.0001 # Load existing filter if it exists if os.path.exists("email_bloom.bloom"): bloom_filter = BloomFilter.fromfile(open("email_bloom.bloom", "rb")) else: bloom_filter = BloomFilter(capacity=estimated_entries, error_rate=false_positive_rate) paths_checked = get_all_crawled_paths() or set() batch_size = 100 # Reduce IO by saving in batches processed_files = 0 for r, d, f in os.walk("C:/Hash/"): for file in f: file_path = os.path.join(r, file) if '.txt' in file and file_path not in paths_checked: unique_emails = [] with open(file_path, 'r') as fp: for line in fp: em = line.strip() if em not in bloom_filter: unique_emails.append(em) bloom_filter.add(em) save_as_newfile(unique_emails) paths_checked.add(file_path) processed_files += 1 # Batch save to minimize IO if processed_files % batch_size == 0: save_done_so_far(paths_checked) bloom_filter.tofile(open("email_bloom.bloom", "wb")) # Final save for remaining data save_done_so_far(paths_checked) bloom_filter.tofile(open("email_bloom.bloom", "wb"))
Note: For 100% accuracy, pair this with a small on-disk cache. When the Bloom Filter says an element "might exist," verify against the cache before discarding it.
2. Use an On-Disk Key-Value Store for Exact Deduplication
If you need zero false positives, shift the deduplication index from memory to a disk-based database like SQLite, LevelDB, or RocksDB. These tools handle large datasets efficiently without blowing up your RAM.
Example with SQLite (Simple & Built-In)
import sqlite3 import os # Initialize database connection conn = sqlite3.connect('email_dedupe.db') cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS emails (email TEXT PRIMARY KEY)''') conn.commit() paths_checked = get_all_crawled_paths() or set() batch_size = 50 processed_files = 0 for r, d, f in os.walk("C:/Hash/"): for file in f: file_path = os.path.join(r, file) if '.txt' in file and file_path not in paths_checked: unique_emails = [] with open(file_path, 'r') as fp: for line in fp: em = line.strip() # Check if email exists in database cursor.execute('SELECT 1 FROM emails WHERE email = ?', (em,)) if not cursor.fetchone(): unique_emails.append(em) cursor.execute('INSERT INTO emails VALUES (?)', (em,)) # Commit all inserts at once to reduce IO conn.commit() save_as_newfile(unique_emails) paths_checked.add(file_path) processed_files += 1 if processed_files % batch_size == 0: save_done_so_far(paths_checked) conn.close() save_done_so_far(paths_checked)
Pro Tip: For faster performance, switch to LevelDB/RocksDB—they’re optimized for high-throughput key-value operations.
3. Hash Sharding: Break the Problem into Smaller Pieces
Split your dataset into smaller, manageable chunks using hash-based sharding. Each chunk can be deduplicated independently in memory, eliminating the need for a global in-memory set.
Step-by-Step Implementation
import os import hashlib def get_shard_number(email, num_shards=100): # Use MD5 for consistent hashing across runs md5_hash = hashlib.md5(email.encode()).hexdigest() return int(md5_hash, 16) % num_shards # Step 1: Split all emails into shards num_shards = 100 shard_files = [open(f'shard_{i}.txt', 'a', encoding='utf-8') for i in range(num_shards)] paths_checked = get_all_crawled_paths() or set() for r, d, f in os.walk("C:/Hash/"): for file in f: file_path = os.path.join(r, file) if '.txt' in file and file_path not in paths_checked: with open(file_path, 'r', encoding='utf-8') as fp: for line in fp: em = line.strip() shard_idx = get_shard_number(em, num_shards) shard_files[shard_idx].write(f"{em}\n") paths_checked.add(file_path) save_done_so_far(paths_checked) # Close shard files for sf in shard_files: sf.close() # Step 2: Deduplicate each shard individually for i in range(num_shards): shard_path = f'shard_{i}.txt' unique_entries = set() with open(shard_path, 'r', encoding='utf-8') as fp: for line in fp: em = line.strip() unique_entries.add(em) # Save deduplicated shard with open(f'unique_shard_{i}.txt', 'w', encoding='utf-8') as fp: fp.write('\n'.join(unique_entries)) os.remove(shard_path)
This approach turns a global memory problem into 100 small local problems, each easily handled with standard in-memory sets.
4. Quick Fixes for Your Existing Code (No Major Overhaul)
If you want to tweak your current code without changing its core structure:
- Reduce IO frequency: Save
main_listand processed paths only after every 50-100 files instead of every single file. - Reuse collections: Replace
emails = set()withemails.clear()to avoid reallocating memory repeatedly. - Use context managers: Always open files with
withto ensure proper resource cleanup.
Optimized Existing Code Snippet
main_list = get_all_stored_emails() or set() paths_checked = get_all_crawled_paths() or set() emails = set() batch_size = 50 processed_count = 0 for r, d, f in os.walk("C:/Hash/"): for file in f: file_path = os.path.join(r, file) if '.txt' in file and file_path not in paths_checked: with open(file_path, 'r') as fp: for line in fp: em = line.strip() if em not in main_list: emails.add(em) main_list.add(em) save_as_newfile(emails) paths_checked.add(file_path) emails.clear() processed_count += 1 # Batch save to minimize IO overhead if processed_count % batch_size == 0: save_mainlist_to_file(main_list) save_done_so_far(paths_checked) # Final save for remaining data save_mainlist_to_file(main_list) save_done_so_far(paths_checked)
内容的提问来源于stack exchange,提问作者John




