You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多进程处理测量数据脚本执行后卡住问题求助

Fixing Your Stuck Multiprocessing Script for Large Measurement Data

Hey there, let's dig into why your script is freezing up after all 4 processes report 100% completion. I've looked over your code and problem description, and here are the top issues to fix:

1. You're not unpacking arguments passed to workPackage

Your main process sends over parameters like filelistChunks[i], colnames, etc., to the workPackage function, but the function only accepts a single args parameter and never unpacks it. That means variables like filelist, outputnames, identifier, and queue are all undefined inside the function! This would throw a NameError immediately, which gets caught by your bare except: block—but the code never runs queue.put(outputdata) as a result. So when your main process tries to read from the queue later, it waits forever because there's nothing there.

Fix this first:
Add an argument unpacking line at the start of workPackage:

def workPackage(args):
    try:
        # Unpack the arguments sent from the main process
        filelist, datacolnames, outputnames, identifier, queue = args
        outputdata = dict()
        iterator = 1
        # Rest of your code stays the same...

2. Your exception handling hides errors and breaks queue flow

Using a bare except: is a risky move—it catches everything (even keyboard interrupts!) and gives you zero visibility into what went wrong. Worse, if any error pops up during file processing (like a corrupted file, missing column name, or memory blip), your code skips queue.put(outputdata). The main process has no way to know something failed, so it just hangs waiting for data that never arrives.

Fix the exception handling:
Catch specific exceptions, log the error, and send a failure marker to the queue so the main process knows what's up:

def workPackage(args):
    try:
        filelist, datacolnames, outputnames, identifier, queue = args
        outputdata = dict()
        iterator = 1
        for name in outputnames:
            outputdata[name] = []
        for filename in filelist:
            read_data = np.genfromtxt(filename, comments="#", unpack=True, names=datacolnames, delimiter=";")
            mean_val1 = np.mean(read_data["val1"])
            mean_val2 = np.mean(read_data["val2"])
            outputdata[outputnames[0]].append(read_data["setpoint"][0])
            outputdata[outputnames[1]].append(mean_val1)
            outputdata[outputnames[2]].append(mean_val2)
            outputdata[outputnames[3]].append(mean_val1-mean_val2)
            outputdata[outputnames[4]].append((mean_val1-mean_val2)/read_data["setpoint"][0]*100)
            outputdata[outputnames[5]].append(2*np.std(read_data["val1"]))
            outputdata[outputnames[6]].append(2*np.std(read_data["val2"]))
            print(f"Process {identifier+1}: {round(100*(iterator/len(filelist)),1)}% complete")
            iterator += 1
        # Send success data to queue
        queue.put({"success": True, "data": outputdata})
    except Exception as e:
        error_msg = f"Process {identifier+1} failed: {str(e)}"
        print(error_msg)
        # Send error info to queue so main process doesn't hang
        queue.put({"success": False, "error": error_msg, "process_id": identifier})

3. Your main process reads queues in a risky way

Right now you join all processes first, then read from queues. Even if processes finish, if one didn't send data to the queue (because of an error), queue.get() will block forever by default.

Make reading queues safer:
Add a timeout when getting from queues, and check process exit codes to catch silent failures:

if __name__ == '__main__':
    numberOfProcesses = 4
    q = [None]*numberOfProcesses
    Processes = [None]*numberOfProcesses
    try:
        print(f"Distributing the workload on {numberOfProcesses} processes...")
        for i in range(numberOfProcesses):
            q[i] = multiprocessing.Queue()
            Processes[i] = multiprocessing.Process(target=workPackage, args=(filelistChunks[i], colnames, outputdatanames, i, q[i]))
            Processes[i].start()
        
        # Read results first (with timeout) before joining processes
        results = []
        for i in range(numberOfProcesses):
            try:
                # Wait 5 minutes max for each process to send data (adjust as needed)
                res = q[i].get(timeout=300)
                results.append(res)
            except multiprocessing.queues.Empty:
                print(f"Process {i+1} didn't send any data within the timeout window")
        
        # Wait for all processes to clean up
        for i in range(numberOfProcesses):
            Processes[i].join()
            # Check if the process exited normally
            if Processes[i].exitcode != 0:
                print(f"Process {i+1} exited with error code {Processes[i].exitcode}")
        
        # Combine results from successful processes
        final_output = {}
        for res in results:
            if res["success"]:
                # Merge the data (adjust this logic to match your needs)
                for key, values in res["data"].items():
                    if key not in final_output:
                        final_output[key] = []
                    final_output[key].extend(values)
            else:
                print(f"Error from Process {res['process_id']+1}: {res['error']}")
        
        # Save your final_output to file here
        # ...
    except Exception as e:
        print(f"Exception in main script: {str(e)}")

4. Bonus: Watch out for memory bloat with large files

You're processing 13GB of data total—if individual files are huge, np.genfromtxt loads the entire file into memory, which could cause silent crashes (caught by your old except: block). If you still see issues after fixing the above, try reading files line-by-line instead:

# Replace np.genfromtxt with this line-by-line approach (adjust columns to match your data)
import csv
import numpy as np

read_data = {"val1": [], "val2": [], "setpoint": []}
with open(filename, 'r') as f:
    reader = csv.reader(f, delimiter=';')
    for row in reader:
        # Skip comment lines
        if row[0].startswith('#'):
            continue
        # Map columns to your keys (adjust indices to match your file structure)
        read_data["setpoint"].append(float(row[0]))
        read_data["val1"].append(float(row[1]))
        read_data["val2"].append(float(row[2]))
# Convert lists to numpy arrays for calculations
for key in read_data:
    read_data[key] = np.array(read_data[key])

内容的提问来源于stack exchange,提问作者Iffuw

火山引擎 最新活动