You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何从PySpark ALS算法获取相似兴趣用户分组?排查聚类异常

Fixing User Clustering Issues with PySpark ALS + KMeans

Hey there! Let's tackle your problem where users with no overlapping company interactions are ending up in the same KMeans cluster. First, let's break down why this might be happening, then walk through fixes and improvements to your code.

Why This Happens

It’s not entirely "abnormal" for users without shared company interactions to cluster together when using ALS user features—here’s why:

  • ALS maps users to a low-dimensional embedding space where similar behavior patterns (not just exact shared interactions) are grouped. For example, two users might never click the same company, but both prefer healthcare startups, so their embeddings would be close.
  • That said, your code has a couple of potential issues that could lead to unexpected clustering results:
    1. Risk of misaligned RDDs: Splitting model.userFeatures() into two separate RDDs and zipping them doesn’t guarantee order is preserved across transformations, which could accidentally pair the wrong user ID with a cluster ID.
    2. Unscaled features: KMeans is sensitive to feature scale. ALS user features aren’t standardized by default, which can skew cluster assignments.
    3. Suboptimal KMeans initialization: Using random initialization instead of k-means++ can lead to unstable, less meaningful clusters.

Step-by-Step Fixes & Improvements

1. Fix RDD Alignment Issues

Instead of splitting and zipping RDDs, process user IDs and features in a single RDD to avoid mismatches:

# Get user ID + features in one RDD (no need to split into two)
user_features_rdd = model.userFeatures()

# Train KMeans on the feature vectors
number_of_clusters = 10
model_kmm = KMeans.train(
    user_features_rdd.map(lambda x: x[1]),
    number_of_clusters,
    initializationMode="k-means||",  # Use k-means++ for better initial centroids
    runs=5,  # Increase runs for more stable clusters
    maxIterations=20  # Ensure the algorithm converges
)

# Predict clusters and pair with user IDs in one step
user_clusters = user_features_rdd.map(lambda x: (x[0], model_kmm.predict(x[1])))

2. Standardize User Features

KMeans relies on Euclidean distance, so standardizing features to have zero mean and unit variance will improve cluster quality:

from pyspark.mllib.feature import StandardScaler

# Extract features and fit scaler
features = user_features_rdd.map(lambda x: x[1])
scaler = StandardScaler(withMean=True, withStd=True).fit(features)

# Scale features and pair back with user IDs
scaled_user_features = user_features_rdd.map(lambda x: (x[0], scaler.transform(x[1])))

# Train KMeans on scaled features
model_kmm = KMeans.train(
    scaled_user_features.map(lambda x: x[1]),
    number_of_clusters,
    initializationMode="k-means||",
    runs=5,
    maxIterations=20
)

user_clusters = scaled_user_features.map(lambda x: (x[0], model_kmm.predict(x[1])))

3. Validate Cluster Quality

To ensure clusters make sense, use the silhouette score to evaluate separation:

from pyspark.mllib.evaluation import ClusteringMetrics

# Pair scaled features with their cluster labels
cluster_data = scaled_user_features.map(lambda x: x[1]).zip(user_clusters.map(lambda x: x[1]))

# Calculate silhouette score (1 = well-separated clusters, 0 = overlapping clusters)
metrics = ClusteringMetrics(cluster_data)
print(f"Silhouette Score: {metrics.silhouette:.3f}")

4. Validate Your ALS Model

If your user embeddings aren’t capturing behavior correctly, confirm the ALS model fits your data well using RMSE:

from math import sqrt

# Calculate RMSE on test data
test_data = rddTesting.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = rddTesting.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print(f"Test RMSE: {rmse:.3f}")

Lower RMSE means a better-fitting model, which leads to more meaningful user embeddings.

5. If You Need Clusters Based on Exact Shared Companies

If your business requires clusters only of users who interacted with the same companies (instead of similar behavior), use user-item interaction vectors (e.g., count vectors) with PCA to reduce dimensionality:

from pyspark.mllib.feature import PCA
from pyspark.mllib.linalg import Vectors

# Get total number of unique companies
num_companies = dfAccos.count()

# Create sparse vectors of user-company interaction counts
user_item_counts = dfRates.rdd.map(lambda r: (r[0], (r[1], r[2])))\
    .groupByKey()\
    .mapValues(lambda vs: Vectors.sparse(num_companies, [(c, cnt) for c, cnt in vs]))

# Apply PCA to reduce dimensions
pca = PCA(k=BEST_RANK).fit(user_item_counts.map(lambda x: x[1]))
user_pca_features = user_item_counts.map(lambda x: (x[0], pca.transform(x[1])))

# Cluster using PCA features (don't forget to standardize!)

Final Corrected Code Snippet

Here’s your updated code with key fixes:

from __future__ import print_function
import sys
from math import sqrt
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.evaluation import ClusteringMetrics

conf = SparkConf().setAppName("user_clustering")
sc = SparkContext(conf=conf)
sc.setCheckpointDir('checkpoint/')
sqlContext = SQLContext(sc)

# Cloud SQL configuration
CLOUDSQL_INSTANCE_IP = sys.argv[1]
CLOUDSQL_DB_NAME = sys.argv[2]
CLOUDSQL_USER = sys.argv[3]
CLOUDSQL_PWD = sys.argv[4]
BEST_RANK = int(sys.argv[5])
BEST_ITERATION = int(sys.argv[6])
BEST_REGULATION = float(sys.argv[7])

TABLE_ITEMS = "companies"
TABLE_RATINGS = "ml_ratings"
TABLE_USER_CLUSTERS = "ml_user_clusters"

# Read data from Cloud SQL
jdbcUrl = 'jdbc:mysql://%s:3306/%s?user=%s&password=%s' % (CLOUDSQL_INSTANCE_IP, CLOUDSQL_DB_NAME, CLOUDSQL_USER, CLOUDSQL_PWD)
dfAccos = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_ITEMS)
dfRates = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_RATINGS)

# Split data into train/validation/test
rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2])

# Train ALS model
model = ALS.train(rddTraining, BEST_RANK, BEST_ITERATION, BEST_REGULATION)

# Validate ALS model with RMSE
test_data = rddTesting.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = rddTesting.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print(f"ALS Test RMSE: {rmse:.3f}")

# Process user features and standardize
user_features_rdd = model.userFeatures()
features = user_features_rdd.map(lambda x: x[1])
scaler = StandardScaler(withMean=True, withStd=True).fit(features)
scaled_user_features = user_features_rdd.map(lambda x: (x[0], scaler.transform(x[1])))

# Train KMeans with improved parameters
number_of_clusters = 10
model_kmm = KMeans.train(
    scaled_user_features.map(lambda x: x[1]),
    number_of_clusters,
    initializationMode="k-means||",
    runs=5,
    maxIterations=20
)

# Assign clusters to users
user_clusters = scaled_user_features.map(lambda x: (x[0], model_kmm.predict(x[1])))

# Evaluate clusters
cluster_data = scaled_user_features.map(lambda x: x[1]).zip(user_clusters.map(lambda x: x[1]))
metrics = ClusteringMetrics(cluster_data)
print(f"KMeans Silhouette Score: {metrics.silhouette:.3f}")

# Save clusters to Cloud SQL
userGroupSchema = StructType([StructField("primaryUser", IntegerType(), True), StructField("groupId", IntegerType(), True)])
dfUserGroups = sqlContext.createDataFrame(user_clusters, userGroupSchema)

try:
    dfUserGroups.write.jdbc(url=jdbcUrl, table=TABLE_USER_CLUSTERS, mode='append')
    print("Successfully wrote user clusters to DB")
except Exception as e:
    print(f"Error writing to DB: {str(e)}")

print("Job completed!")

内容的提问来源于stack exchange,提问作者tolgatanriverdi

火山引擎 最新活动