如何从PySpark ALS算法获取相似兴趣用户分组?排查聚类异常
Hey there! Let's tackle your problem where users with no overlapping company interactions are ending up in the same KMeans cluster. First, let's break down why this might be happening, then walk through fixes and improvements to your code.
Why This Happens
It’s not entirely "abnormal" for users without shared company interactions to cluster together when using ALS user features—here’s why:
- ALS maps users to a low-dimensional embedding space where similar behavior patterns (not just exact shared interactions) are grouped. For example, two users might never click the same company, but both prefer healthcare startups, so their embeddings would be close.
- That said, your code has a couple of potential issues that could lead to unexpected clustering results:
- Risk of misaligned RDDs: Splitting
model.userFeatures()into two separate RDDs and zipping them doesn’t guarantee order is preserved across transformations, which could accidentally pair the wrong user ID with a cluster ID. - Unscaled features: KMeans is sensitive to feature scale. ALS user features aren’t standardized by default, which can skew cluster assignments.
- Suboptimal KMeans initialization: Using random initialization instead of
k-means++can lead to unstable, less meaningful clusters.
- Risk of misaligned RDDs: Splitting
Step-by-Step Fixes & Improvements
1. Fix RDD Alignment Issues
Instead of splitting and zipping RDDs, process user IDs and features in a single RDD to avoid mismatches:
# Get user ID + features in one RDD (no need to split into two) user_features_rdd = model.userFeatures() # Train KMeans on the feature vectors number_of_clusters = 10 model_kmm = KMeans.train( user_features_rdd.map(lambda x: x[1]), number_of_clusters, initializationMode="k-means||", # Use k-means++ for better initial centroids runs=5, # Increase runs for more stable clusters maxIterations=20 # Ensure the algorithm converges ) # Predict clusters and pair with user IDs in one step user_clusters = user_features_rdd.map(lambda x: (x[0], model_kmm.predict(x[1])))
2. Standardize User Features
KMeans relies on Euclidean distance, so standardizing features to have zero mean and unit variance will improve cluster quality:
from pyspark.mllib.feature import StandardScaler # Extract features and fit scaler features = user_features_rdd.map(lambda x: x[1]) scaler = StandardScaler(withMean=True, withStd=True).fit(features) # Scale features and pair back with user IDs scaled_user_features = user_features_rdd.map(lambda x: (x[0], scaler.transform(x[1]))) # Train KMeans on scaled features model_kmm = KMeans.train( scaled_user_features.map(lambda x: x[1]), number_of_clusters, initializationMode="k-means||", runs=5, maxIterations=20 ) user_clusters = scaled_user_features.map(lambda x: (x[0], model_kmm.predict(x[1])))
3. Validate Cluster Quality
To ensure clusters make sense, use the silhouette score to evaluate separation:
from pyspark.mllib.evaluation import ClusteringMetrics # Pair scaled features with their cluster labels cluster_data = scaled_user_features.map(lambda x: x[1]).zip(user_clusters.map(lambda x: x[1])) # Calculate silhouette score (1 = well-separated clusters, 0 = overlapping clusters) metrics = ClusteringMetrics(cluster_data) print(f"Silhouette Score: {metrics.silhouette:.3f}")
4. Validate Your ALS Model
If your user embeddings aren’t capturing behavior correctly, confirm the ALS model fits your data well using RMSE:
from math import sqrt # Calculate RMSE on test data test_data = rddTesting.map(lambda p: (p[0], p[1])) predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2])) rates_and_preds = rddTesting.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) print(f"Test RMSE: {rmse:.3f}")
Lower RMSE means a better-fitting model, which leads to more meaningful user embeddings.
5. If You Need Clusters Based on Exact Shared Companies
If your business requires clusters only of users who interacted with the same companies (instead of similar behavior), use user-item interaction vectors (e.g., count vectors) with PCA to reduce dimensionality:
from pyspark.mllib.feature import PCA from pyspark.mllib.linalg import Vectors # Get total number of unique companies num_companies = dfAccos.count() # Create sparse vectors of user-company interaction counts user_item_counts = dfRates.rdd.map(lambda r: (r[0], (r[1], r[2])))\ .groupByKey()\ .mapValues(lambda vs: Vectors.sparse(num_companies, [(c, cnt) for c, cnt in vs])) # Apply PCA to reduce dimensions pca = PCA(k=BEST_RANK).fit(user_item_counts.map(lambda x: x[1])) user_pca_features = user_item_counts.map(lambda x: (x[0], pca.transform(x[1]))) # Cluster using PCA features (don't forget to standardize!)
Final Corrected Code Snippet
Here’s your updated code with key fixes:
from __future__ import print_function import sys from math import sqrt from pyspark import SparkContext, SparkConf, SQLContext from pyspark.mllib.recommendation import ALS, Rating from pyspark.sql.types import StructType, StructField, IntegerType from pyspark.mllib.clustering import KMeans from pyspark.mllib.feature import StandardScaler from pyspark.mllib.evaluation import ClusteringMetrics conf = SparkConf().setAppName("user_clustering") sc = SparkContext(conf=conf) sc.setCheckpointDir('checkpoint/') sqlContext = SQLContext(sc) # Cloud SQL configuration CLOUDSQL_INSTANCE_IP = sys.argv[1] CLOUDSQL_DB_NAME = sys.argv[2] CLOUDSQL_USER = sys.argv[3] CLOUDSQL_PWD = sys.argv[4] BEST_RANK = int(sys.argv[5]) BEST_ITERATION = int(sys.argv[6]) BEST_REGULATION = float(sys.argv[7]) TABLE_ITEMS = "companies" TABLE_RATINGS = "ml_ratings" TABLE_USER_CLUSTERS = "ml_user_clusters" # Read data from Cloud SQL jdbcUrl = 'jdbc:mysql://%s:3306/%s?user=%s&password=%s' % (CLOUDSQL_INSTANCE_IP, CLOUDSQL_DB_NAME, CLOUDSQL_USER, CLOUDSQL_PWD) dfAccos = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_ITEMS) dfRates = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_RATINGS) # Split data into train/validation/test rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2]) # Train ALS model model = ALS.train(rddTraining, BEST_RANK, BEST_ITERATION, BEST_REGULATION) # Validate ALS model with RMSE test_data = rddTesting.map(lambda p: (p[0], p[1])) predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2])) rates_and_preds = rddTesting.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) rmse = sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) print(f"ALS Test RMSE: {rmse:.3f}") # Process user features and standardize user_features_rdd = model.userFeatures() features = user_features_rdd.map(lambda x: x[1]) scaler = StandardScaler(withMean=True, withStd=True).fit(features) scaled_user_features = user_features_rdd.map(lambda x: (x[0], scaler.transform(x[1]))) # Train KMeans with improved parameters number_of_clusters = 10 model_kmm = KMeans.train( scaled_user_features.map(lambda x: x[1]), number_of_clusters, initializationMode="k-means||", runs=5, maxIterations=20 ) # Assign clusters to users user_clusters = scaled_user_features.map(lambda x: (x[0], model_kmm.predict(x[1]))) # Evaluate clusters cluster_data = scaled_user_features.map(lambda x: x[1]).zip(user_clusters.map(lambda x: x[1])) metrics = ClusteringMetrics(cluster_data) print(f"KMeans Silhouette Score: {metrics.silhouette:.3f}") # Save clusters to Cloud SQL userGroupSchema = StructType([StructField("primaryUser", IntegerType(), True), StructField("groupId", IntegerType(), True)]) dfUserGroups = sqlContext.createDataFrame(user_clusters, userGroupSchema) try: dfUserGroups.write.jdbc(url=jdbcUrl, table=TABLE_USER_CLUSTERS, mode='append') print("Successfully wrote user clusters to DB") except Exception as e: print(f"Error writing to DB: {str(e)}") print("Job completed!")
内容的提问来源于stack exchange,提问作者tolgatanriverdi




