Say goodbye to expensive shuffles in Spark! With the Storage Partition Join (SPJ) optimization technique in Spark >= 3.3 (more matured in 3.4), you can perform joins on partitioned Data Source V2 tables without triggering a shuffle (of course, some conditions apply).
Shuffles are expensive, especially in joins in Spark, mainly because:
Shuffle requires data transfer across the network, which is CPU intensive.
During shuffling, Shuffle files are written into a local disk, and it's Disk I/O Expensive.
Data Source V2 tables are Open Format Tables, i.e., Apache Hudi, Apache Iceberg, and Delta Lake Tables.
❗
At the point of writing this, currently, SPJ support is only present in Apache Iceberg starting 1.2.0
This blog post will cover:
What are the requirements for SPJ to work?
What configurations needs to be set for SPJ to work?
How to check whether SPJ is working or not for your Spark Job?
Diving Deeper into SPJ by understanding the configurations being set.
💡
All the code present in the blog can be found here. You can download it locally to follow along.
Both the target and source must be Iceberg Tables.
Both the source and target tables should have the same partitioning (at least one of the partitioning columns should be the same)
The join condition must include the partition column.
Configurations must be appropriately set.
Apache Iceberg Version >= 1.2.0 and Spark version >= 3.3.0
Configurations
spark.sql.sources.v2.bucketing.enabled set to true
spark.sql.sources.v2.bucketing.pushPartValues.enabled set to true
spark.sql.iceberg.planning.preserve-data-grouping set to true
spark.sql.requireAllClusterKeysForCoPartition set to false
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled set to true
Partitioning Keys and Clustering Keys refer to the same concept and can be used interchangeably. Please do not confuse them.
I will be using Spark 3.5.0 with Iceberg 1.5.0 for this.
Before we get into SPJ, let's create some mock data and look into how the Spark join plan actually looks when SPJ is not working:
Initializing SparkSession
We will be initializing a Spark Session with all the Iceberg related configs but without SPJ configs first:
Preparing Data
We will be creating and writing data into 2 Iceberg tables:
Customers and Orders both tables are partitioned/clustered by region
Both can be joined using the customer_id and includes the region details and some other common details like Name, Email etc.
Data is being mocked up using Faker python library. In case, if you don't have it:
pip install faker
Writing Data into Iceberg Tables
Joining tables without SPJ Config enabled
Exchange Node in the plan above represents the shuffle.
If you are more of an Spark UI person, then this can also be seen there.
Join with SPJ Enabled
Let's enable the SPJ required configurations
Let's perform the same join again and look into the plan and Spark UI
There are no Exchange nodes present in the plan and that means NO SHUFFLE!!!
Let's take a look at Spark UI too.
Now that's amazing but hey, yo, wait, that's the ideal scenario where we have our tables are partitioned the same way and join is only using partitioned column. In real world scenario that's hardly the case.
Valid Point! Let's dive a bit deeper to see how it works and see some similar joining conditions as real case scenarios to check if SPJ is gonna work.
Understanding Configurations used in SPJ
A Storage Partitioned Join makes use of the existing storage layout to avoid the shuffle phase.
The must and minimum requirement for SPJ to work is setting the configurations that can provide this information, i.e.
spark.sql.iceberg.planning.preserve-data-grouping When true, partitioning info is retained during query planning. This prevents unnecessary repartitioning, optimizing performance by reducing shuffle costs during execution.
spark.sql.sources.v2.bucketing.enabled When true, try to eliminate shuffle by using the partitioning reported by a compatible V2 data source.
Let's look at the various join scenarios:
Scenario 1: Joining keys same as Partitioning keys
No Exchange Nodes in the plan. So minimal configuration works in this case. 🥳
Scenario 2: Partitions from both side doesn't match
Let's create this kind of scenario by dropping one partition from Orders table
Let's check the plan now for the same joining condition
Exchange (Shuffle) is back again..‼️ 🤨
To handle such scenarios, Spark creates empty partition for the missing partition values after enabling the mentioned configuration:
spark.sql.sources.v2.bucketing.pushPartValues.enabled When enabled, try to eliminate shuffle if one side of the join has missing partition values from the other side.
No more shuffle..!! 🥳
Scenario 3: Join Keys do not match the Partition Keys This scenario can have one of these 2 cases:
Join Keys are superset of Partition Keys
Join Keys are subset of Partition Keys
3.1 Join Keys are superset of Partition Keys
These are the queries with additional field in join along with Partition Keys like:
By default, Spark requires all the partition keys should be same and in order to eliminate the shuffle. This behavior can be turned off using:
spark.sql.requireAllClusterKeysForCoPartition When true, require the join or MERGE keys to be same and in the same order as the partition keys to eliminate shuffle. That's the reason it's being set to false
No Shuffle..!!! 🥳
3.2 Join Keys are subset of Partition Keys
⚠️
SPJ doesn't work for this scenario in Spark < 4.0. The code example below was tested in Scala by locally building the latest Spark 4.0 code.
These can be the scenarios where tables are not partitioned the same way, like in the below:
Customers table is partitioned by region and bucket(customer_id,2)
Orders table is partitioned by region and bucket(customer_id, 4)
Or the cases where tables are partitioned on multiple columns and join uses only few of them to join.
In such cases, Spark 4.0 groups the input partitions on column regions something like below:
Spark 4.0 provides a configuration to enable this – spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled When enabled, try to avoid shuffle if join condition does not include all partition columns.
Scenario 4: Data Skewness in Partitions
This is of major concern if you are running a heavy workload where data skewness is pretty common. Let's say your data distribution looks something like this:
Unfortunately, I couldn't replicate this scenario even after multiple tries, so this has to be theoretical and maybe I will update it in future as soon as I can replicate this.
So, theoretically, sparks provides a configuration:
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled When true, and when the join is not a full outer join, enable skew optimizations to handle partitions with large amounts of data when avoiding shuffle.
With this configuration enabled, Spark breaks the skewed partitions into multiple splits and the other side of same partition will be grouped and replicated to match the same.
Skewed region=East partition from Customers table is split into 2 small partitions, on Orders table side, 2 copies of region=East are created.
That's it for this one!! 🚀
If you have read it until here, you are equipped with all the knowledge you need to know where SPJ can be used in your job.
How can you eliminate shuffling, sorting, and push-down filters to optimize the Apache Iceberg merge statements?
11 min read
Dec
15
Selecting between Double and Decimal Data Type To Avoid Unexpected Results
How to choose between Double and Decimal data types for your tables/datasets, why does it matter, and when to choose which one?
4 min read
Dec
08
How withColumn Can Degrade the Performance of a Spark Job?
Reasons and Solutions to Avoid Performance Degradation due to excessive use of `.withColumn()` in Apache Spark
8 min read
Oct
12
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
7 min read
Feb
13
EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
Member discussion