Python多进程处理测量数据脚本执行后卡住问题求助
Hey there, let's dig into why your script is freezing up after all 4 processes report 100% completion. I've looked over your code and problem description, and here are the top issues to fix:
1. You're not unpacking arguments passed to workPackage
Your main process sends over parameters like filelistChunks[i], colnames, etc., to the workPackage function, but the function only accepts a single args parameter and never unpacks it. That means variables like filelist, outputnames, identifier, and queue are all undefined inside the function! This would throw a NameError immediately, which gets caught by your bare except: block—but the code never runs queue.put(outputdata) as a result. So when your main process tries to read from the queue later, it waits forever because there's nothing there.
Fix this first:
Add an argument unpacking line at the start of workPackage:
def workPackage(args): try: # Unpack the arguments sent from the main process filelist, datacolnames, outputnames, identifier, queue = args outputdata = dict() iterator = 1 # Rest of your code stays the same...
2. Your exception handling hides errors and breaks queue flow
Using a bare except: is a risky move—it catches everything (even keyboard interrupts!) and gives you zero visibility into what went wrong. Worse, if any error pops up during file processing (like a corrupted file, missing column name, or memory blip), your code skips queue.put(outputdata). The main process has no way to know something failed, so it just hangs waiting for data that never arrives.
Fix the exception handling:
Catch specific exceptions, log the error, and send a failure marker to the queue so the main process knows what's up:
def workPackage(args): try: filelist, datacolnames, outputnames, identifier, queue = args outputdata = dict() iterator = 1 for name in outputnames: outputdata[name] = [] for filename in filelist: read_data = np.genfromtxt(filename, comments="#", unpack=True, names=datacolnames, delimiter=";") mean_val1 = np.mean(read_data["val1"]) mean_val2 = np.mean(read_data["val2"]) outputdata[outputnames[0]].append(read_data["setpoint"][0]) outputdata[outputnames[1]].append(mean_val1) outputdata[outputnames[2]].append(mean_val2) outputdata[outputnames[3]].append(mean_val1-mean_val2) outputdata[outputnames[4]].append((mean_val1-mean_val2)/read_data["setpoint"][0]*100) outputdata[outputnames[5]].append(2*np.std(read_data["val1"])) outputdata[outputnames[6]].append(2*np.std(read_data["val2"])) print(f"Process {identifier+1}: {round(100*(iterator/len(filelist)),1)}% complete") iterator += 1 # Send success data to queue queue.put({"success": True, "data": outputdata}) except Exception as e: error_msg = f"Process {identifier+1} failed: {str(e)}" print(error_msg) # Send error info to queue so main process doesn't hang queue.put({"success": False, "error": error_msg, "process_id": identifier})
3. Your main process reads queues in a risky way
Right now you join all processes first, then read from queues. Even if processes finish, if one didn't send data to the queue (because of an error), queue.get() will block forever by default.
Make reading queues safer:
Add a timeout when getting from queues, and check process exit codes to catch silent failures:
if __name__ == '__main__': numberOfProcesses = 4 q = [None]*numberOfProcesses Processes = [None]*numberOfProcesses try: print(f"Distributing the workload on {numberOfProcesses} processes...") for i in range(numberOfProcesses): q[i] = multiprocessing.Queue() Processes[i] = multiprocessing.Process(target=workPackage, args=(filelistChunks[i], colnames, outputdatanames, i, q[i])) Processes[i].start() # Read results first (with timeout) before joining processes results = [] for i in range(numberOfProcesses): try: # Wait 5 minutes max for each process to send data (adjust as needed) res = q[i].get(timeout=300) results.append(res) except multiprocessing.queues.Empty: print(f"Process {i+1} didn't send any data within the timeout window") # Wait for all processes to clean up for i in range(numberOfProcesses): Processes[i].join() # Check if the process exited normally if Processes[i].exitcode != 0: print(f"Process {i+1} exited with error code {Processes[i].exitcode}") # Combine results from successful processes final_output = {} for res in results: if res["success"]: # Merge the data (adjust this logic to match your needs) for key, values in res["data"].items(): if key not in final_output: final_output[key] = [] final_output[key].extend(values) else: print(f"Error from Process {res['process_id']+1}: {res['error']}") # Save your final_output to file here # ... except Exception as e: print(f"Exception in main script: {str(e)}")
4. Bonus: Watch out for memory bloat with large files
You're processing 13GB of data total—if individual files are huge, np.genfromtxt loads the entire file into memory, which could cause silent crashes (caught by your old except: block). If you still see issues after fixing the above, try reading files line-by-line instead:
# Replace np.genfromtxt with this line-by-line approach (adjust columns to match your data) import csv import numpy as np read_data = {"val1": [], "val2": [], "setpoint": []} with open(filename, 'r') as f: reader = csv.reader(f, delimiter=';') for row in reader: # Skip comment lines if row[0].startswith('#'): continue # Map columns to your keys (adjust indices to match your file structure) read_data["setpoint"].append(float(row[0])) read_data["val1"].append(float(row[1])) read_data["val2"].append(float(row[2])) # Convert lists to numpy arrays for calculations for key in read_data: read_data[key] = np.array(read_data[key])
内容的提问来源于stack exchange,提问作者Iffuw




