Using Spark’s withColumn may seem harmless, but if misused, it can slow down your job significantly! The surprising part? You might not notice it until you dig into Spark’s Logical Plans.
By the end of this newsletter edition, you will be able to identify and fix such issues in your Spark Job and write more performant code from now on.
Don't believe me ?!? Let's dive right into it.
Problem and How to Spot It
Suppose you need to add or cast hundreds of columns that need to be populated with some constant value or need casting to a specific data type.
A typical, thoughtful, and expandable approach is to get or define all such columns in a list and just run a simple for-loop:
Looks so clean and simple, right? Let's look at the Physical Plan of this query
df.explain("simple")
Everything looks fine and as expected. Cool..!! But wait, let's look at the Logical Plans also once
We can see multiple project nodes (1 for each column added using withColumn)
Okay. There are multiple Project nodes, so what? It's not in the Physical Plan, and Spark will finally execute the selected Physical Plan. So, there shouldn't be any performance regression because of this during execution.
Well, the performance degradation happens before it even reaches the Physical Plan.
Cause of Performance Degradation
Each time withColumn is used to add a column in the dataframe, Spark’s Catalyst optimizer re-evaluates the whole plan repeatedly. This adds up fast and strains performance.
The surprising part? You might not notice it until you dig into Spark’s Logical Plans.
This issue is not so obvious because it doesn't show up in the SparkUI. Your job that might take only 5mins to complete, can end up taking 5x more time because of multiple withColumn.
A further deep dive into this will be in the end. Let's examine the solutions to avoid this degradation.
Solutions
Solution #1: Using .withColumns() for Spark >= 3.3
Starting from Spark 3.3, withColumns() transformation is available to use, that takes a dictionary of string and Column datatype.
Only a single Project node present in the Logical Plan.
Solution #2: Using .select() with an alias
Another way to achieve the same is via .select with alias or .selectExpr()
Only single Project Node in Logical Plans in both the cases.
FAQs
Every time I explain this, there are some follow up questions that engineers ask:
Is this the only case when .withColumn is used in for loop?
No. The same issue happens when we use multiple .withColumn outside loop also. We can look into the Logical Plan again to verify it.
Should we not use .withColumn at all then?
If the number of columns being added are fairly low, we can use it, it wouldn't make much of a difference. But if you are planning to write a code, that you think can further be extended based on the upcoming requirements, I would recommend using .withColumns or other 2 options.
How many .withColumn are too many that can cause degradation?
There are no specific numbers of columns, but if you have like 100s of withColumn with some transformation logics, chances are your Spark Job can do so much better.
How can we look into SparkUI then if this is the issue?
The issue won't be so easily visible on SparkUI, the starting point is to compare the Job Uptime and the time taken by the Jobs in Spark UI Jobs tab.
If all the jobs are finishing quickly but total Uptime is significantly higher, chances are multiple withColumn are the potential cause.
If you have read it until here, you are equipped with all the tools and tricks to identify and rectify this issue.
Got any questions or suggestions? Put it in the comments.
If you are interested in what's happening internally that degrades the performance, let's dive deeper into it.
Diving a bit deeper
To reach to a Physical Plan for execution, the entire code goes through Spark Catalyst multiple stages starting from Parsing – Unresolved Logical Plan – Resolved Logical Plan – Optimized Logical Plan.
A Logical Plan is internally represented as a tree created after parsing the sql. On a very high level, all of these stages have some set of analyzer/optimizer rules that runs in batches on top of this generated tree.
Every time a .withColumn is used, all the set of rules related to it runs again.
Want to see how many rules run every time and how much time does this take?
Spark’s Catalyst optimizer uses an internal class called RuleExecutor. Its dumpTimeSpent() method can provide all such details.
When multiple .withColumn are used
When .withColumns is used
If you look at the run and the time taken in both the cases it's whopping ~96% difference in time.
Time doesn't look much in this cases because we are just doing casting but in real project scenario, these withColumn include some complex transformation or calculation logic and those further adds more rules in the Analyzer that requires running.
Lesson Learned: Although withColumn is a lazy transformation, each use has an analysis cost.
How can you eliminate shuffling, sorting, and push-down filters to optimize the Apache Iceberg merge statements?
11 min read
Nov
28
Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.
10 min read
Oct
12
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
7 min read
Feb
13
EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
Member discussion