You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

在Scikit-Learn XGBoost流水线中集成自定义特征工程及优化脚本架构的技术问询

在Scikit-Learn XGBoost流水线中集成自定义特征工程及优化脚本架构的技术问询

问题背景

我正在用Scikit-Learn和XGBoost构建一个回归任务的机器学习流水线,当前脚本可以运行但架构有点乱。现在我是在ColumnTransformer之前手动做自定义特征工程(比如用长和宽相乘创建面积特征)而且脚本底部有一堆零散的参考代码片段(不同数据格式加载、特征创建等)。下面是当前代码:

import pandas as pd
from xgboost import XGBRegressor
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, TargetEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

train_data = pd.read_csv("train.csv")
y_train = train_data['target']
x_train = train_data.drop(columns=['id', 'target'])

test_data = pd.read_csv("test.csv")
x_test = test_data.drop(columns=['id'])

numeric_features = x_train.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_features = x_train.select_dtypes(include=['category', 'object']).columns.tolist()

numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scalar', StandardScaler())
])
#Use ('scalar', RobustScaler()) if dataset has skewed outliers
#import RobustScaler() from sklearn.preprocessing

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='Missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
   #Use target encoder instead of onehot encoder if there are lots of unique values for a feature 
   #('target_encoder', TargetEncoder())
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

model_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', XGBRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=42, n_jobs=-1))
])

print("Training...")
model_pipeline.fit(x_train, y_train)

print("Making Predictions...")
test_preds = model_pipeline.predict(x_test)

submission = pd.DataFrame({
    'id': test_data['id'],
    'target': test_preds
})

submission.to_csv('submission.csv', index=False)
print("Success")

#ways to load data
train_data = pd.read_csv("train.tsv", sep='\t') #Tab-Separated Values
train_data = pd.read_parquet("train.parquet")
train_data = pd.read_json("train.json")
train_data = pd.read_csv("train.csv.zip")
train_data = pd.read_csv("train.csv.gz")
train_data = pd.read_pickle("train.pkl")

#ways to create a new feature
x_train['area'] = x_train['length'] * x_train['breath']
x_test['area'] = x_test['length'] * x_test['breath']

x_train = x_train.drop(columns=['length', 'breath'])
x_test = x_test.drop(columns=['length', 'breath'])

#ways to convert a target column
TARGET_COL = 'Heart Disease'
y_train = train_data[TARGET_COL].map({'Presence': 1, 'Absence': 0})
X_train = train_data.drop(columns=['id', TARGET_COL])
X_test = test_data.drop(columns=['id'])

#Ways to find no. of unique values in a feature
print(x_train['song_name'].nunique())

#Ways to install xgboost
python -m ensurepip --upgrade
python -m pip install xgboost

#Ways to install pandas and scikit-learn
pip install pandas scikit-learn

#Ways to check accuracy of the model
from sklearn.model_selection import cross_val_score

print("Running Cross Validation...")

scores = cross_val_score(
    model_pipeline, 
    x_train, 
    y_train, 
    cv=5, 
    scoring='neg_mean_squared_error',
    n_jobs=-1
)

rmse_scores = np.sqrt(-scores)

print(f"Average RMSE: {rmse_scores.mean():.4f} (+/- {rmse_scores.std():.4f})")

核心问题

  1. 如何把面积计算直接集成到model_pipeline中,不用手动分别应用到训练和测试集?用FunctionTransformer还是自定义类?
  2. 有没有动态方式让脚本根据文件扩展名自动检测并加载对应格式(CSV、Parquet、JSON等),不用留一堆零散的参考代码?
  3. 有什么建议可以整理脚本提升复用性?

解答

1. 将自定义特征工程(面积计算)集成到流水线

两种方案都可行,取决于你的需求复杂度:

方案A:用FunctionTransformer(简单场景首选)

如果只是简单的特征计算,FunctionTransformer足够轻量,不用写自定义类。你可以把面积计算逻辑封装成函数,然后用它创建一个 transformer,加到流水线的最前面(在preprocessor之前)。

示例代码:

from sklearn.preprocessing import FunctionTransformer

def create_area_feature(X):
    # 复制数据避免修改原数据
    X_copy = X.copy()
    X_copy['area'] = X_copy['length'] * X_copy['breath']
    # 可选:如果不需要保留原length和breath,可以删除
    X_copy = X_copy.drop(columns=['length', 'breath'])
    return X_copy

# 创建特征工程的transformer
feature_engineering_transformer = FunctionTransformer(create_area_feature, validate=False)

# 更新流水线,把特征工程步骤放在最前面
model_pipeline = Pipeline(steps=[
    ('feature_eng', feature_engineering_transformer),
    ('preprocessor', preprocessor),
    ('regressor', XGBRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=42, n_jobs=-1))
])

注意:设置validate=False是因为我们处理的是DataFrame,默认的验证会把数据转成numpy数组,会丢失列名,导致后续preprocessor无法识别特征类型。

方案B:自定义Transformer类(复杂场景)

如果你的特征工程逻辑更复杂(比如需要保存训练阶段的状态、处理不同分支逻辑),可以继承BaseEstimatorTransformerMixin写自定义类,这样能更好地兼容Scikit-Learn的流水线API。

示例代码:

from sklearn.base import BaseEstimator, TransformerMixin

class AreaFeatureCreator(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        # 这里不需要训练状态,所以直接返回self
        return self
    
    def transform(self, X):
        X_copy = X.copy()
        X_copy['area'] = X_copy['length'] * X_copy['breath']
        X_copy = X_copy.drop(columns=['length', 'breath'])
        return X_copy

# 用自定义类创建transformer
feature_engineering_transformer = AreaFeatureCreator()

# 更新流水线
model_pipeline = Pipeline(steps=[
    ('feature_eng', feature_engineering_transformer),
    ('preprocessor', preprocessor),
    ('regressor', XGBRegressor(...))
])

这种方案的好处是扩展性强——如果以后要加更多特征计算(比如周长、体积),直接在transform方法里加逻辑就行,而且完全符合Scikit-Learn的接口规范。

2. 动态加载不同格式的数据

你可以写一个通用的load_data函数,根据文件扩展名判断用哪种Pandas加载方法,把零散的参考代码整合进去:

import os

def load_data(file_path):
    ext = os.path.splitext(file_path)[1].lower()
    if ext in ['.csv', '.tsv', '.gz', '.zip']:
        # 处理压缩或分隔符文件
        if ext == '.tsv':
            return pd.read_csv(file_path, sep='\t')
        else:
            # Pandas会自动处理.gz/.zip压缩的CSV
            return pd.read_csv(file_path)
    elif ext == '.parquet':
        return pd.read_parquet(file_path)
    elif ext == '.json':
        return pd.read_json(file_path)
    elif ext == '.pkl':
        return pd.read_pickle(file_path)
    else:
        raise ValueError(f"不支持的文件格式:{ext}")

# 使用示例
train_data = load_data("train.csv")
test_data = load_data("test.parquet")

这样你只需要传文件路径,函数会自动识别格式加载,不用再留一堆零散的参考代码,而且可以轻松扩展支持更多格式(比如.feather)。

3. 提升脚本复用性的建议

这里有几个实用的整理方向:

(1)把配置和逻辑分离

把固定的配置(比如XGBoost参数、特征工程参数、交叉验证设置)抽到脚本开头的配置块里,方便后续修改:

# 配置区
CONFIG = {
    'target_col': 'target',
    'id_col': 'id',
    'numeric_impute_strategy': 'median',
    'categorical_impute_strategy': 'constant',
    'xgb_params': {
        'n_estimators': 100,
        'max_depth': 6,
        'learning_rate': 0.1,
        'random_state': 42,
        'n_jobs': -1
    },
    'cv_folds': 5,
    'cv_scoring': 'neg_mean_squared_error'
}
(2)封装成函数或类

把重复的逻辑(比如数据加载、流水线创建、交叉验证)封装成函数,这样以后换任务可以直接复用:

def build_preprocessor(numeric_features, categorical_features):
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=CONFIG['numeric_impute_strategy'])),
        ('scalar', StandardScaler())
    ])
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=CONFIG['categorical_impute_strategy'], fill_value='Missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])
    return ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )

def build_model_pipeline(preprocessor):
    return Pipeline(steps=[
        ('feature_eng', feature_engineering_transformer),
        ('preprocessor', preprocessor),
        ('regressor', XGBRegressor(**CONFIG['xgb_params']))
    ])
(3)添加命令行参数(可选)

如果要让脚本更灵活,可以用argparse添加命令行参数,比如指定训练/测试文件路径、目标列名等,这样不用每次改代码:

import argparse

def parse_args():
    parser = argparse.ArgumentParser(description='回归任务流水线')
    parser.add_argument('--train-path', required=True, help='训练数据文件路径')
    parser.add_argument('--test-path', required=True, help='测试数据文件路径')
    parser.add_argument('--submission-path', default='submission.csv', help='提交文件保存路径')
    return parser.parse_args()

# 主函数
if __name__ == '__main__':
    args = parse_args()
    train_data = load_data(args.train_path)
    test_data = load_data(args.test_path)
    # 后续逻辑...
(4)把交叉验证逻辑封装成函数

把之前零散的交叉验证代码封装成函数,方便随时调用:

import numpy as np
from sklearn.model_selection import cross_val_score

def run_cross_validation(pipeline, X, y):
    print("运行交叉验证...")
    scores = cross_val_score(
        pipeline,
        X,
        y,
        cv=CONFIG['cv_folds'],
        scoring=CONFIG['cv_scoring'],
        n_jobs=-1
    )
    rmse_scores = np.sqrt(-scores)
    print(f"平均RMSE: {rmse_scores.mean():.4f} (+/- {rmse_scores.std():.4f})")
    return rmse_scores

整理后的完整脚本示例

把上面的优化点整合起来,你可以得到一个结构清晰、复用性强的脚本:

import os
import numpy as np
import pandas as pd
from xgboost import XGBRegressor
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score
import argparse

# ---------------------- 配置区 ----------------------
CONFIG = {
    'target_col': 'target',
    'id_col': 'id',
    'numeric_impute_strategy': 'median',
    'categorical_impute_strategy': 'constant',
    'xgb_params': {
        'n_estimators': 100,
        'max_depth': 6,
        'learning_rate': 0.1,
        'random_state': 42,
        'n_jobs': -1
    },
    'cv_folds': 5,
    'cv_scoring': 'neg_mean_squared_error'
}
# ---------------------------------------------------

def load_data(file_path):
    ext = os.path.splitext(file_path)[1].lower()
    if ext in ['.csv', '.tsv', '.gz', '.zip']:
        if ext == '.tsv':
            return pd.read_csv(file_path, sep='\t')
        else:
            return pd.read_csv(file_path)
    elif ext == '.parquet':
        return pd.read_parquet(file_path)
    elif ext == '.json':
        return pd.read_json(file_path)
    elif ext == '.pkl':
        return pd.read_pickle(file_path)
    else:
        raise ValueError(f"不支持的文件格式:{ext}")

def create_area_feature(X):
    X_copy = X.copy()
    X_copy['area'] = X_copy['length'] * X_copy['breath']
    X_copy = X_copy.drop(columns=['length', 'breath'])
    return X_copy

def build_preprocessor(numeric_features, categorical_features):
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=CONFIG['numeric_impute_strategy'])),
        ('scalar', StandardScaler())
        # 提示:如果有异常值,替换为RobustScaler
        # ('scalar', RobustScaler())
    ])
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy=CONFIG['categorical_impute_strategy'], fill_value='Missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
        # 提示:如果是高基数类别特征,替换为TargetEncoder
        # ('target_encoder', TargetEncoder())
    ])
    return ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )

def build_model_pipeline(preprocessor):
    feature_engineering_transformer = FunctionTransformer(create_area_feature, validate=False)
    return Pipeline(steps=[
        ('feature_eng', feature_engineering_transformer),
        ('preprocessor', preprocessor),
        ('regressor', XGBRegressor(**CONFIG['xgb_params']))
    ])

def run_cross_validation(pipeline, X, y):
    print("运行交叉验证...")
    scores = cross_val_score(
        pipeline,
        X,
        y,
        cv=CONFIG['cv_folds'],
        scoring=CONFIG['cv_scoring'],
        n_jobs=-1
    )
    rmse_scores = np.sqrt(-scores)
    print(f"平均RMSE: {rmse_scores.mean():.4f} (+/- {rmse_scores.std():.4f})")
    return rmse_scores

def parse_args():
    parser = argparse.ArgumentParser(description='回归任务ML流水线')
    parser.add_argument('--train-path', required=True, help='训练数据文件路径')
    parser.add_argument('--test-path', required=True, help='测试数据文件路径')
    parser.add_argument('--submission-path', default='submission.csv', help='提交文件保存路径')
    return parser.parse_args()

if __name__ == '__main__':
    args = parse_args()
    
    # 加载数据
    print("加载数据...")
    train_data = load_data(args.train_path)
    test_data = load_data(args.test_path)
    
    # 拆分特征和目标
    y_train = train_data[CONFIG['target_col']]
    X_train = train_data.drop(columns=[CONFIG['id_col'], CONFIG['target_col']])
    X_test = test_data.drop(columns=[CONFIG['id_col']])
    
    # 自动检测特征类型
    numeric_features = X_train.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = X_train.select_dtypes(include=['category', 'object']).columns.tolist()
    
    # 构建流水线
    print("构建流水线...")
    preprocessor = build_preprocessor(numeric_features, categorical_features)
    model

火山引擎 最新活动