如何用sparklyr包在R中实现类mclapply的Spark并行计算
Hey there! I totally get the urge to replicate that familiar mclapply workflow when moving to Spark with sparklyr—it's such a clean, intuitive structure for parallel work. Let's break down how to build that Spark-friendly equivalent using spark_apply, which is sparklyr's tool for running custom R functions across a distributed dataset.
First, let's adjust your existing code to fit Spark's distributed model. Unlike mclapply which iterates over local row indices, Spark works with distributed partitions of data. To mimic that row-by-row processing you're used to, we'll add a row ID to group by, then use spark_apply to run your function on each row (or optimize to run on partitions for better performance).
Here's the completed version of your Spark code:
library(sparklyr) sc = spark_connect(master = "local") dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000)) # Updated function to work with Spark's grouped/partitioned data .testFunc = function(df_subset, str_param) { # For row-level processing: each df_subset is a single row nSum = as.numeric(df_subset$X + df_subset$Y) return(data.frame(nSum = nSum, str = str_param)) } dfTest_tbl = copy_to(sc, dfTest, "test_tbl", overwrite = TRUE) # Step 1: Add a row ID to enable row-level grouping (matches mclapply's index iteration) dfTest_tbl = dfTest_tbl %>% mutate(row_id = row_number()) # Step 2: Use spark_apply to run the function in parallel across the cluster spark_output = spark_apply( x = dfTest_tbl, f = .testFunc, # Pass your extra parameters here (like the "useless string") parameters = list(str_param = "useless string"), # Tell Spark the schema of your output (required for distributed processing) schema = struct_type( struct_field("nSum", "double"), struct_field("str", "string") ), # Group by row ID to process each row individually (just like mclapply) group_by = "row_id" ) # Optional: Convert Spark output to a local list matching mclapply's format lOutput = spark_output %>% collect() %>% # Pull data back to your local R session split(.$row_id) %>% # Split into individual row entries lapply(function(x) list(nSum = x$nSum, str = x$str)) # Match the list structure from mclapply # Don't forget to disconnect when done spark_disconnect(sc)
Key Notes & Optimizations:
- Why
spark_apply? It's sparklyr's equivalent for running custom R code across distributed data, just likemclapplydoes locally. It handles distributing the data to cluster nodes, running your function in parallel, and aggregating results. - Schema Requirement: Unlike
mclapply, Spark needs to know the output structure upfront (viaschema)—this is part of how it manages distributed data efficiently. - Performance Tip: If your function can handle batches of rows instead of individual ones, skip the
group_by = "row_id"part. Processing entire partitions (Spark's natural distributed units) will be much faster for large datasets. Here's how that optimized version looks:# Optimized function to process entire partitions .testFunc_partition = function(df_partition, str_param) { df_partition$nSum = as.numeric(df_partition$X + df_partition$Y) df_partition$str = str_param return(df_partition %>% select(nSum, str)) } # No row ID needed—process partitions directly spark_output_optimized = spark_apply( x = dfTest_tbl, f = .testFunc_partition, parameters = list(str_param = "useless string"), schema = struct_type( struct_field("nSum", "double"), struct_field("str", "string") ) ) - Matching
mclapplyOutput: Thecollect()+split()+lapply()step converts Spark's distributed DataFrame back to a local list structure that mirrors whatmclapplyreturns. If you don't need the local list, you can keep the result as a Spark DataFrame for further distributed processing.
Reference Info:
For more details on spark_apply, run ?spark_apply in your R session—this will pull up the official sparklyr documentation with examples of more complex use cases, handling dependencies, and tuning parallelism.
内容的提问来源于stack exchange,提问作者ciccioz




