并行循环操作Numpy数组时触发TypeError:float对象不支持元素赋值
并行循环操作Numpy数组时触发TypeError:float对象不支持元素赋值
问题背景
你尝试用joblib.Parallel并行计算特征间的关联值,结果要存入两个Numpy数组mindvfij和mindvfji,但运行时触发了如下错误:
TypeError: 'float' object does not support item assignment
对应的完整错误回溯如下:
_RemoteTraceback: Traceback (most recent call last): File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 436, in _process_worker r = call_item() File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 288, in __call__ return self.fn(*self.args, **self.kwargs) File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/_parallel_backends.py", line 595, in __call__ return self.func(*args, **kwargs) File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 262, in __call__ return [func(*args, **kwargs) File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 262, in <listcomp> return [func(*args, **kwargs) File "/home/rahul/.config/spyder-py3/untitled2.py", line 57, in my_function TypeError: 'float' object does not support item assignment """ The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/rahul/.config/spyder-py3/untitled2.py", line 63, in <module> Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields))) File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 1056, in __call__ self.retrieve() File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 935, in retrieve self._output.extend(job.get(timeout=self.timeout)) File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result return future.result(timeout=timeout) File "/home/rahul/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 445, in result return self.__get_result() File "/home/rahul/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result raise self._exception TypeError: 'float' object does not support item assignment
错误原因分析
我仔细梳理了你的代码,发现两个核心问题:
- 变量被意外覆盖
你在主循环里写了这两行代码:
mindvfij = 0.0 mindvfji = 0.0
这会直接把之前定义的Numpy数组mindvfij和mindvfji覆盖成float类型的数值0.0。之后在my_function里尝试执行mindvfij[i-1,j-1] = a时,mindvfij已经是单个float值而非数组,自然会触发“float对象不支持元素赋值”的错误。
- 多进程内存不共享问题
就算修复了变量覆盖的问题,joblib默认用的loky多进程模式下,子进程无法直接修改主进程中的Numpy数组——每个子进程都有独立的内存空间,修改只会在子进程内生效,主进程的数组根本不会得到更新。
解决方案
这里提供两种可行的修复方案,你可以根据自己的场景选择:
方案一:函数返回结果,主进程统一合并
把my_function改成返回计算得到的(i,j)位置的a和b值,然后在主进程中将这些结果填充到数组里,完全规避子进程直接操作数组的问题:
import numpy as np import pandas as pd from joblib import Parallel, delayed import multiprocessing df = pd.read_csv("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/real_malware_v1/extracted_real_malware_v1.csv") fields = list(df.columns) num_cores = multiprocessing.cpu_count() n = len(fields) - 1 mindvfij = np.empty((n, n), dtype='float64') mindvfji = np.empty((n, n), dtype='float64') # 重写函数,返回位置标记和计算值 def my_function(i,j): nfij = 0 nfi = 0 nfj = 0 for k in range(0, len(df)): x = df.iloc[k,i] y = df.iloc[k,j] if x == 1 and y == 1: nfij += 1 if x == 1: nfi += 1 if y == 1: nfj += 1 a = nfij / nfi if nfi != 0 else np.nan b = nfij / nfj if nfj != 0 else np.nan return (i-1, j-1, a, b) # 收集所有并行任务的结果 results = [] for i in range(1, len(fields)): res = Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields))) results.extend(res) # 把结果批量填充到数组中 for i_idx, j_idx, a_val, b_val in results: mindvfij[i_idx, j_idx] = a_val mindvfji[i_idx, j_idx] = b_val # 如果需要对称填充(比如j,i位置也需要对应值),可以取消下面注释 # mindvfij[j_idx, i_idx] = b_val # mindvfji[j_idx, i_idx] = a_val # 保存最终数组 np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfij", mindvfij) np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfji", mindvfji) print("Finish")
方案二:使用共享内存数组(适合大数据场景)
如果你的数据集非常大,不想重复传递数据,可以用multiprocessing.Array创建共享内存的数组,再转换成Numpy数组供子进程操作,示例代码如下:
import numpy as np import pandas as pd from joblib import Parallel, delayed import multiprocessing from multiprocessing import Array df = pd.read_csv("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/real_malware_v1/extracted_real_malware_v1.csv") fields = list(df.columns) num_cores = multiprocessing.cpu_count() n = len(fields) - 1 # 创建共享内存数组 shared_mindvfij = Array('d', n*n) shared_mindvfji = Array('d', n*n) # 转换成可操作的Numpy数组 mindvfij = np.frombuffer(shared_mindvfij.get_obj(), dtype='float64').reshape(n, n) mindvfji = np.frombuffer(shared_mindvfji.get_obj(), dtype='float64').reshape(n, n) def my_function(i,j): nfij = 0 nfi = 0 nfj = 0 for k in range(0, len(df)): x = df.iloc[k,i] y = df.iloc[k,j] if x == 1 and y == 1: nfij += 1 if x == 1: nfi += 1 if y == 1: nfj += 1 if nfi != 0: mindvfij[i-1,j-1] = nfij / nfi if nfj != 0: mindvfji[i-1,j-1] = nfij / nfj for i in range(1, len(fields)): Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields))) np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfij", mindvfij) np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfji", mindvfji) print("Finish")
额外优化建议
- 你现在逐行遍历DataFrame的
for k in range(0, len(df))效率很低,建议用Pandas向量化操作替代,比如:
这种方式能大幅提升计算速度,尤其是数据集较大时。x_col = df.iloc[:,i] y_col = df.iloc[:,j] nfij = ((x_col == 1) & (y_col == 1)).sum() nfi = (x_col == 1).sum() nfj = (y_col == 1).sum() - 如果特征数量很多,并行任务过多,建议给
Parallel设置batch_size参数(比如batch_size=10),减少进程调度的开销。
备注:内容来源于stack exchange,提问作者Rahul Gupta




