Spark作业运行统计入库及SQL Join细粒度耗时监控方案咨询
Hey there! Let’s tackle your two Spark performance tracking questions one by one—they’re focused on different levels of granularity, so we’ll cover each with practical, actionable approaches.
When it comes to capturing and persisting Spark job stats, these are the most reliable methods:
Custom SparkListener (Recommended)
Spark’s built-inSparkListenerinterface is the gold standard for deep, native tracking. You can implement it to hook into job, stage, and task lifecycle events (likeonJobEndoronStageCompleted). From these events, you can extract metrics like total job duration, shuffle read/write sizes, task failure counts, and more. Once you’ve collected the data, you can format it as a DataFrame and write it directly to your database via JDBC, or batch it for periodic inserts. This method is non-intrusive, scalable, and works for both batch and streaming jobs.Spark UI REST API
If you don’t want to modify your application code, Spark exposes a REST API (default endpoint:http://<driver-host>:4040/api/v1/jobs) that returns job/stage stats in JSON format. You can set up a cron job or a small service to periodically pull this data, parse the JSON, and write it to your database. Just note that the 4040 endpoint only works while the driver is running—for completed jobs, you’ll need to enable the Spark History Server and use its corresponding API endpoint.Metrics System Integration
Spark’s built-in metrics system can push stats to external systems like JMX, InfluxDB, or Prometheus. If your database supports ingestion from these systems (e.g., Prometheus can write to PostgreSQL via exporters), this is a low-code option. However, for custom, business-specific metrics, theSparkListenermethod gives you more control.
For tracking individual Join operations in your SparkSQL queries, you have a few targeted options depending on your Spark version and code preferences:
Option 1: QueryExecutionListener (Spark 3.x+)
Spark 3.0 introduced the QueryExecutionListener, which is purpose-built for tracking SQL query execution. This lets you capture exact timing for each SQL statement, including your Join queries, with minimal code changes.
Here’s a sample implementation:
import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.sql.execution.QueryExecution import java.util.Properties class JoinPerformanceTracker extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { // Get the custom query name we'll set in our code val queryName = qe.sparkSession.sparkContext.getLocalProperty("spark.sql.queryName") // Check if this query includes a Join operation val isJoinQuery = qe.executedPlan.exists(_.nodeName.contains("Join")) if (isJoinQuery && queryName.isDefined) { // Convert nanoseconds to milliseconds for readability val durationMs = durationNs / 1000000 val timestamp = System.currentTimeMillis() // Prepare data and write to your database import qe.sparkSession.implicits._ Seq((queryName.get, durationMs, timestamp)) .toDF("join_query_name", "duration_ms", "recorded_at") .write.mode("append") .jdbc( url = "jdbc:mysql://your-db-host:3306/your-db", table = "join_performance_stats", connectionProperties = new Properties() {{ setProperty("user", "your-user"); setProperty("password", "your-pass"); }} ) } } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // Handle failed queries (e.g., record error details to your stats table) val queryName = qe.sparkSession.sparkContext.getLocalProperty("spark.sql.queryName") // Add your failure logging logic here } } // Register the listener with your SparkSession spark.listenerManager.register(new JoinPerformanceTracker())
Then, update your business code to add a queryName to each Join query (so we can identify them in the listener):
// Tag each Join query with a unique name val DF1 = spark.sql("select x,y from A,B ").queryName("Join_A_B") val DF2 = spark.sql("select k,v from TABLE1,TABLE2 ").queryName("Join_TABLE1_TABLE2") val finalDF = DF1.join(DF2, "some-key").queryName("Final_Join_DF1_DF2") finalDF.saveAsTable("your-target-table")
This method is clean, non-intrusive, and directly ties timing data to each specific Join query.
Option 2: Manual Timing (Quick & Dirty)
If you’re on an older Spark version or prefer a more straightforward approach, you can add manual timing around each query execution. Just remember that Spark uses lazy evaluation—you’ll need to trigger an action (like count() or cache()) to ensure the query actually runs before recording the time.
Example:
import java.util.concurrent.TimeUnit // Track first Join val start1 = System.nanoTime() val DF1 = spark.sql("select x,y from A,B ") DF1.cache() // Persist to avoid re-computing later DF1.count() // Trigger execution val duration1Ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1) // Track second Join val start2 = System.nanoTime() val DF2 = spark.sql("select k,v from TABLE1,TABLE2 ") DF2.cache() DF2.count() val duration2Ms = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start2) // Track final Join + save val startFinal = System.nanoTime() val finalDF = DF1.join(DF2, "some-key") finalDF.saveAsTable("your-target-table") val durationFinalMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFinal) // Write all stats to database import spark.implicits._ Seq( ("Join_A_B", duration1Ms, System.currentTimeMillis()), ("Join_TABLE1_TABLE2", duration2Ms, System.currentTimeMillis()), ("Final_Join_DF1_DF2", durationFinalMs, System.currentTimeMillis()) ).toDF("join_query_name", "duration_ms", "recorded_at") .write.mode("append") .jdbc("jdbc:mysql://your-db-host:3306/your-db", "join_performance_stats", new Properties() {{ setProperty("user", "your-user"); setProperty("password", "your-pass"); }})
The downside here is that it’s intrusive—you’ll have to modify your business code to add timing logic. But it works for any Spark version and requires no listener setup.
Option 3: SparkListener for Stage-Level Tracking
If you need even finer granularity (e.g., tracking individual stages of a Join), you can use a custom SparkListener to capture stage completion events. Stages associated with Join operations (like Shuffle Join stages) will have names containing keywords like SortMergeJoin or BroadcastHashJoin. You can map these stages back to your queries by tagging jobs with custom properties when submitting them.
内容的提问来源于stack exchange,提问作者Nina A




