You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何优化Python多文件去重代码?处理100GB数据性能问题

Great question—handling deduplication for 100GB of text files in memory is a classic bottleneck scenario. Let’s start by breaking down why your current code is slowing down, then dive into practical optimizations and alternative approaches that can handle this scale efficiently.

First, the core issues with your existing code:

  • The main_list set grows indefinitely in memory, consuming gigabytes of RAM. This leads to frequent garbage collection, memory paging to disk, and overall slowdowns.
  • Frequent synchronous IO operations (saving main_list and processed paths after every file) block the processing pipeline and add unnecessary overhead.
  • Maintaining two in-memory sets (main_list and emails) amplifies the memory pressure.

1. Use a Bloom Filter for Memory-Efficient Probabilistic Deduplication

If you can tolerate an extremely low false positive rate (e.g., 0.01%), a Bloom Filter is a game-changer. It’s a space-efficient probabilistic data structure that can check for existence without storing the actual elements.

Example Implementation

from pybloom_live import BloomFilter
import os

# Configure based on your estimated total number of unique entries
estimated_entries = 1_000_000_000
false_positive_rate = 0.0001

# Load existing filter if it exists
if os.path.exists("email_bloom.bloom"):
    bloom_filter = BloomFilter.fromfile(open("email_bloom.bloom", "rb"))
else:
    bloom_filter = BloomFilter(capacity=estimated_entries, error_rate=false_positive_rate)

paths_checked = get_all_crawled_paths() or set()
batch_size = 100  # Reduce IO by saving in batches
processed_files = 0

for r, d, f in os.walk("C:/Hash/"):
    for file in f:
        file_path = os.path.join(r, file)
        if '.txt' in file and file_path not in paths_checked:
            unique_emails = []
            with open(file_path, 'r') as fp:
                for line in fp:
                    em = line.strip()
                    if em not in bloom_filter:
                        unique_emails.append(em)
                        bloom_filter.add(em)
            
            save_as_newfile(unique_emails)
            paths_checked.add(file_path)
            processed_files += 1

            # Batch save to minimize IO
            if processed_files % batch_size == 0:
                save_done_so_far(paths_checked)
                bloom_filter.tofile(open("email_bloom.bloom", "wb"))

# Final save for remaining data
save_done_so_far(paths_checked)
bloom_filter.tofile(open("email_bloom.bloom", "wb"))

Note: For 100% accuracy, pair this with a small on-disk cache. When the Bloom Filter says an element "might exist," verify against the cache before discarding it.


2. Use an On-Disk Key-Value Store for Exact Deduplication

If you need zero false positives, shift the deduplication index from memory to a disk-based database like SQLite, LevelDB, or RocksDB. These tools handle large datasets efficiently without blowing up your RAM.

Example with SQLite (Simple & Built-In)

import sqlite3
import os

# Initialize database connection
conn = sqlite3.connect('email_dedupe.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS emails
                  (email TEXT PRIMARY KEY)''')
conn.commit()

paths_checked = get_all_crawled_paths() or set()
batch_size = 50
processed_files = 0

for r, d, f in os.walk("C:/Hash/"):
    for file in f:
        file_path = os.path.join(r, file)
        if '.txt' in file and file_path not in paths_checked:
            unique_emails = []
            with open(file_path, 'r') as fp:
                for line in fp:
                    em = line.strip()
                    # Check if email exists in database
                    cursor.execute('SELECT 1 FROM emails WHERE email = ?', (em,))
                    if not cursor.fetchone():
                        unique_emails.append(em)
                        cursor.execute('INSERT INTO emails VALUES (?)', (em,))
            
            # Commit all inserts at once to reduce IO
            conn.commit()
            save_as_newfile(unique_emails)
            paths_checked.add(file_path)
            processed_files += 1

            if processed_files % batch_size == 0:
                save_done_so_far(paths_checked)

conn.close()
save_done_so_far(paths_checked)

Pro Tip: For faster performance, switch to LevelDB/RocksDB—they’re optimized for high-throughput key-value operations.


3. Hash Sharding: Break the Problem into Smaller Pieces

Split your dataset into smaller, manageable chunks using hash-based sharding. Each chunk can be deduplicated independently in memory, eliminating the need for a global in-memory set.

Step-by-Step Implementation

import os
import hashlib

def get_shard_number(email, num_shards=100):
    # Use MD5 for consistent hashing across runs
    md5_hash = hashlib.md5(email.encode()).hexdigest()
    return int(md5_hash, 16) % num_shards

# Step 1: Split all emails into shards
num_shards = 100
shard_files = [open(f'shard_{i}.txt', 'a', encoding='utf-8') for i in range(num_shards)]
paths_checked = get_all_crawled_paths() or set()

for r, d, f in os.walk("C:/Hash/"):
    for file in f:
        file_path = os.path.join(r, file)
        if '.txt' in file and file_path not in paths_checked:
            with open(file_path, 'r', encoding='utf-8') as fp:
                for line in fp:
                    em = line.strip()
                    shard_idx = get_shard_number(em, num_shards)
                    shard_files[shard_idx].write(f"{em}\n")
            paths_checked.add(file_path)
            save_done_so_far(paths_checked)

# Close shard files
for sf in shard_files:
    sf.close()

# Step 2: Deduplicate each shard individually
for i in range(num_shards):
    shard_path = f'shard_{i}.txt'
    unique_entries = set()
    with open(shard_path, 'r', encoding='utf-8') as fp:
        for line in fp:
            em = line.strip()
            unique_entries.add(em)
    # Save deduplicated shard
    with open(f'unique_shard_{i}.txt', 'w', encoding='utf-8') as fp:
        fp.write('\n'.join(unique_entries))
    os.remove(shard_path)

This approach turns a global memory problem into 100 small local problems, each easily handled with standard in-memory sets.


4. Quick Fixes for Your Existing Code (No Major Overhaul)

If you want to tweak your current code without changing its core structure:

  • Reduce IO frequency: Save main_list and processed paths only after every 50-100 files instead of every single file.
  • Reuse collections: Replace emails = set() with emails.clear() to avoid reallocating memory repeatedly.
  • Use context managers: Always open files with with to ensure proper resource cleanup.

Optimized Existing Code Snippet

main_list = get_all_stored_emails() or set()
paths_checked = get_all_crawled_paths() or set()
emails = set()
batch_size = 50
processed_count = 0

for r, d, f in os.walk("C:/Hash/"):
    for file in f:
        file_path = os.path.join(r, file)
        if '.txt' in file and file_path not in paths_checked:
            with open(file_path, 'r') as fp:
                for line in fp:
                    em = line.strip()
                    if em not in main_list:
                        emails.add(em)
                        main_list.add(em)
            
            save_as_newfile(emails)
            paths_checked.add(file_path)
            emails.clear()
            processed_count += 1

            # Batch save to minimize IO overhead
            if processed_count % batch_size == 0:
                save_mainlist_to_file(main_list)
                save_done_so_far(paths_checked)

# Final save for remaining data
save_mainlist_to_file(main_list)
save_done_so_far(paths_checked)

内容的提问来源于stack exchange,提问作者John

火山引擎 最新活动