Spark Execution Internals: Deconstructing Jobs, Stages, and Shuffles
Understanding Spark Execution: A Deep Dive
If you are working with Big Data, writing code that "works" is only half the battle. To truly master Apache Spark, you need to understand how your code is translated into physical execution.
Today, let's break down a specific Spark snippet to see how Jobs, Stages, and Tasks are born.
The Scenario
Imagine we have the following PySpark code:
df = spark.read.parquet("sales")
result = (
df.filter("amount > 100")
.select("customer_id", "amount")
.repartition(4)
.groupBy("customer_id")
.sum("amount")
)
result.write.mode("overwrite").parquet("output")
Our Cluster Constraints:
- Input Data: 12 partitions.
- Cluster Hardware: 4 executors, each capable of running 2 tasks simultaneously.
Q1. How many Spark Jobs will be created?
Answer: 1 Job.
In Spark, a Job is triggered by an Action. Transformations (like
filter or groupBy) are lazy—they don't do anything until an action is called. In this script, .write() is the only action. Therefore, Spark generates exactly one Job to fulfill that write request.Q2. How many Stages will the job contain?
Answer: 3 Stages.
Stages are created whenever Spark hits a "Shuffle Boundary" (a Wide Transformation).
- Stage 1: Includes
read,filter, andselect. These are Narrow Transformations and can be "pipelined" together. - Stage 2: Triggered by
.repartition(4). This forces data to move across the network. - Stage 3: Triggered by
.groupBy(). Even though we repartitioned earlier, agroupByrequires a shuffle to ensure all records for the samecustomer_idend up on the same partition for thesum().
Q3. How many Tasks will run in each stage?
- Stage 1: 12 Tasks. Spark creates one task per input partition.
- Stage 2: 4 Tasks. The
.repartition(4)command explicitly tells Spark to create 4 output partitions. - Stage 3: 200 Tasks (Default). Unless you've changed the
spark.sql.shuffle.partitionsconfiguration, Spark defaults to 200 partitions for any shuffle operation following a transformation likegroupBy.
Q4. What is the maximum number of tasks that can run in parallel?
Answer: 8 Tasks.
Parallelism is limited by your cluster's "slots."
4 Executors × 2 Slots per Executor = 8 concurrent tasks.
Even if Stage 1 has 12 tasks to do, it can only process 8 at a time. The remaining 4 tasks will wait in the queue until slots become free.
Bonus: Optimizing for Shuffle Cost
The Fix: Remove
.repartition(4).Why?
In the original code, we are shuffling the data twice in a row. First for the repartition, and then immediately again for the
In the original code, we are shuffling the data twice in a row. First for the repartition, and then immediately again for the
groupBy.By removing
.repartition(4), you eliminate an entire stage and a massive amount of network I/O. If you need the final output to have a specific number of files, it is much more efficient to adjust the spark.sql.shuffle.partitions setting or use .coalesce() after the aggregation.
Comments
Post a Comment