You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

并行循环操作Numpy数组时触发TypeError:float对象不支持元素赋值

并行循环操作Numpy数组时触发TypeError:float对象不支持元素赋值

问题背景

你尝试用joblib.Parallel并行计算特征间的关联值,结果要存入两个Numpy数组mindvfijmindvfji,但运行时触发了如下错误:

TypeError: 'float' object does not support item assignment

对应的完整错误回溯如下:

_RemoteTraceback:
Traceback (most recent call last):
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 436, in _process_worker
r = call_item()
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 288, in __call__
return self.fn(*self.args, **self.kwargs)
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/_parallel_backends.py", line 595, in __call__
return self.func(*args, **kwargs)
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 262, in __call__
return [func(*args, **kwargs)
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 262, in <listcomp>
return [func(*args, **kwargs)
File "/home/rahul/.config/spyder-py3/untitled2.py", line 57, in my_function
TypeError: 'float' object does not support item assignment

"""
The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/home/rahul/.config/spyder-py3/untitled2.py", line 63, in <module>
Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields)))
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 1056, in __call__
self.retrieve()
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/parallel.py", line 935, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "/home/rahul/anaconda3/lib/python3.9/site-packages/joblib/_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
File "/home/rahul/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/home/rahul/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
TypeError: 'float' object does not support item assignment

错误原因分析

我仔细梳理了你的代码,发现两个核心问题:

  1. 变量被意外覆盖
    你在主循环里写了这两行代码:
mindvfij = 0.0
mindvfji = 0.0

这会直接把之前定义的Numpy数组mindvfijmindvfji覆盖成float类型的数值0.0。之后在my_function里尝试执行mindvfij[i-1,j-1] = a时,mindvfij已经是单个float值而非数组,自然会触发“float对象不支持元素赋值”的错误。

  1. 多进程内存不共享问题
    就算修复了变量覆盖的问题,joblib默认用的loky多进程模式下,子进程无法直接修改主进程中的Numpy数组——每个子进程都有独立的内存空间,修改只会在子进程内生效,主进程的数组根本不会得到更新。

解决方案

这里提供两种可行的修复方案,你可以根据自己的场景选择:

方案一:函数返回结果,主进程统一合并

my_function改成返回计算得到的(i,j)位置的a和b值,然后在主进程中将这些结果填充到数组里,完全规避子进程直接操作数组的问题:

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

df = pd.read_csv("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/real_malware_v1/extracted_real_malware_v1.csv")

fields = list(df.columns)
num_cores = multiprocessing.cpu_count()

n = len(fields) - 1
mindvfij = np.empty((n, n), dtype='float64')
mindvfji = np.empty((n, n), dtype='float64')

# 重写函数,返回位置标记和计算值
def my_function(i,j):
    nfij = 0
    nfi = 0
    nfj = 0
    for k in range(0, len(df)):
        x = df.iloc[k,i]
        y = df.iloc[k,j]
        if x == 1 and y == 1:
            nfij += 1
        if x == 1:
            nfi += 1
        if y == 1:
            nfj += 1
    
    a = nfij / nfi if nfi != 0 else np.nan
    b = nfij / nfj if nfj != 0 else np.nan
    return (i-1, j-1, a, b)

# 收集所有并行任务的结果
results = []
for i in range(1, len(fields)):
    res = Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields)))
    results.extend(res)

# 把结果批量填充到数组中
for i_idx, j_idx, a_val, b_val in results:
    mindvfij[i_idx, j_idx] = a_val
    mindvfji[i_idx, j_idx] = b_val
    # 如果需要对称填充(比如j,i位置也需要对应值),可以取消下面注释
    # mindvfij[j_idx, i_idx] = b_val
    # mindvfji[j_idx, i_idx] = a_val

# 保存最终数组
np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfij", mindvfij)
np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfji", mindvfji)

print("Finish")

方案二:使用共享内存数组(适合大数据场景)

如果你的数据集非常大,不想重复传递数据,可以用multiprocessing.Array创建共享内存的数组,再转换成Numpy数组供子进程操作,示例代码如下:

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
from multiprocessing import Array

df = pd.read_csv("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/real_malware_v1/extracted_real_malware_v1.csv")

fields = list(df.columns)
num_cores = multiprocessing.cpu_count()

n = len(fields) - 1

# 创建共享内存数组
shared_mindvfij = Array('d', n*n)
shared_mindvfji = Array('d', n*n)

# 转换成可操作的Numpy数组
mindvfij = np.frombuffer(shared_mindvfij.get_obj(), dtype='float64').reshape(n, n)
mindvfji = np.frombuffer(shared_mindvfji.get_obj(), dtype='float64').reshape(n, n)

def my_function(i,j):
    nfij = 0
    nfi = 0
    nfj = 0
    for k in range(0, len(df)):
        x = df.iloc[k,i]
        y = df.iloc[k,j]
        if x == 1 and y == 1:
            nfij += 1
        if x == 1:
            nfi += 1
        if y == 1:
            nfj += 1
    
    if nfi != 0:
        mindvfij[i-1,j-1] = nfij / nfi
    if nfj != 0:
        mindvfji[i-1,j-1] = nfij / nfj

for i in range(1, len(fields)):
    Parallel(n_jobs=num_cores)(delayed(my_function)(i,j) for j in range(i+1, len(fields)))

np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfij", mindvfij)
np.save("/media/rahul/cd854f04-608f-4627-9e70-9096a8520b95/feature_bond/malindvfji", mindvfji)

print("Finish")

额外优化建议

  • 你现在逐行遍历DataFrame的for k in range(0, len(df))效率很低,建议用Pandas向量化操作替代,比如:
    x_col = df.iloc[:,i]
    y_col = df.iloc[:,j]
    nfij = ((x_col == 1) & (y_col == 1)).sum()
    nfi = (x_col == 1).sum()
    nfj = (y_col == 1).sum()
    
    这种方式能大幅提升计算速度,尤其是数据集较大时。
  • 如果特征数量很多,并行任务过多,建议给Parallel设置batch_size参数(比如batch_size=10),减少进程调度的开销。

备注:内容来源于stack exchange,提问作者Rahul Gupta

火山引擎 最新活动