Optimizing Iceberg MERGE Statements
If you are using Apache Iceberg Tables and data is being shuffled, you might be doing something wrong!
You might think that's a very contradictory statement, and the safer way to say this can be with "It depends." – that's what I felt when I heard it for the first time. But if you stick to the end of this post, you will either figure out what you are doing wrong or learn something new.
Alrighty, then, let's get into it.
We will take a very simple example for easier understanding and implement a MERGE statement and will further improve on it.
Setting up an Example
Let's assume we have a department data of an organization that has details of employee currently working in which department for a particular year.
So just 3 columns and one technical column:
id
– Employee Id of an employeebusiness_vertical
– Name of the Departmentyear
– Year of workingis_updated
– represents if employee has changed department
All this data will be stored in a Target Table.
In case employees are moved around different verticals, the business_vertical
across their name also needs updation. All such employees data is sent into a different table, we will call this Staged Table
Initializing SparkSession with Iceberg
Generating and Populating Data into Table
We will be generating data for 3 years 2023 to 2025, along with id
and business_vertical
.
We will create a Merge-On-Read Table here to perform faster updates. You can read a detailed newsletter mentioned below if you are unsure about MOR.
Updated Employee's Business Vertical Data
Let's assume 3000 employees have changed their departments to Sales
in 2025
.
Merge Staging Data into Target Table
Now as we have all the data ready, let's merge this staging data with the data present in target table.
Merge Execution Plan
To understand what's happening under the hood of the MERGE statement, let's look into 3 sections of the Merge Execution Plan and the expensive parts within them.
Read
- Scanning the source and target tables with the applied runtime filters.
- Expensive Operations: Scans without pushdown filters get unnecessary data into the memory.
Merge Rows
- Joining both the tables to compute the new state of the table by applying
WHEN MATCHED
andWHEN NOT MATCHED
clauses together to merge the records and create a new state of records - Expensive Operations: Shuffling and Sorting due to the Sort-Merge Join.
Write
- After merging the records, it's time to write them back into the target table.
- Expensive Operations: Pre-shuffling data before writing to align with the table partitioning scheme. (Optional) Pre-sorting of data before writing in case table has sort keys defined.
Exchange, a.k.a. Shuffling operations, will dominate the runtime of the MERGE Operation and will be, by far, the most expensive part of it.
Now that you know all the Expensive parts in all the sections, let's look into the Optimization techniques we can use to avoid or reduce these expensive operations.
Optimizing MERGE Statement
In this section we will be looking into the previous Merge Statement, identify the potential areas to improve it's performance.
Push Down Filters
This is a very common thing to miss out on while writing a MERGE Statement.
I mentioned missing out because you might still be using the required filter condition, but it might not get pushed down while reading from the table.
Let's take a look at previous MERGE Statement Scan Filters:
To push-down any filters it needs to be present in the ON
clause of MERGE
statement. Any filters present at WHEN MATCHED
or WHEN NOT MATCHED
are NOT pushed down.
Sort-Merge Joins to Hash Joins (if possible)
Spark prefers Sort Merge Join by default because it's more scalable than Hash Join.
This behavior is controlled by Spark SQL Configuration spark.sql.join.preferSortMergeJoin
which is set to true
by default.
To overwrite this behavior, set spark.sql.join.preferSortMergeJoin
to false
. In cases where the build side is small enough to build a Hash Map, a Shuffle Hash Join will be preferred over Sort-Merge Join.
If you will notice the Sort
that were caused by Sort Merge Join are eliminated.
Spark Fanout Writers
Local sorting happening before writing data can be avoided by enabling spark fan-out writers for the table.
This local sort is fairly cheaper in batch workloads compared to streaming workloads where it adds additional latency and can be avoided by using Fanout Writers.
Fanout writer opens the files per partition value and doesn't close these files till the write task finishes.
hadoop
catalog but seems to work with the AWS glue
catalog.Iceberg Distribution Modes
Distribution modes in Iceberg decides "How data is being distributed by IcebergWriter before it's being written into the Iceberg Table."
Iceberg provides 3 distribution modes:
None
– Doesn't request for any shuffle or sort to be performed by Spark before writing data. Least expensive of all.hash
– Requests data to be shuffled using hash based exchange by Spark before writing.range
– Requests that Spark perform a range based exchange to shuffle the data before writing. Most expensive of all.
The Exchange
node (Pre-shuffling before Write
) present in write section of Merge Plan before writing data into table is because of the default distribution mode, i.e. hash
.
This Exchange
node can simply be eliminated by changing the distribution mode to None
. This will accelerate the write process, BUT might end up in creating lots of small files.
Trade-offs/Balancing between Write and Read Performance
Eliminating Exchange
before writing into Table is a trade-off decision between Write Performance and Read Performance.
Using None
as a distribution mode can amplify the Write Performance but might degrade the Read Performance because of the creation of many small files.
Read Performance degradation can even be worse when you use MOR tables because while reading, the query engine will have to read many small files and the extra work it has to do to combine the records from the Delete file.
I mentioned balancing here because if you need both Write and Read performance, mitigating degradation in Read Performance requires frequent Compaction runs.
Does this mean we should never use None
Distribution Mode?
You can still use None
Distribution Mode, if you are:
- Writing data only in one partition, like in this case
year=2025
. - Running Compaction frequently – To mitigate Read Performance Degradation, frequent Compaction runs is required.
- Write Performance is more important.
Main idea for this much details is to know what you are doing when you are making this decision.
Note: I am not covering how to mitigate the small file problem in this post as that is entirely a separate problem in itself.
Enabling Storage Partitioned Join
Storage Partitioned Join (SPJ) is a Join Optimization Technique that helps you avoid the expensive Shuffles while performing the joins.
You can read all about it 👇
To enable SPJ, there are 2 main things needs to be done:
Setting up Configurations for SPJ
Including Partition Columns in the ON
clause of MERGE Statement
On a very high level, SPJ works on by understanding how the data is physically laid out in the table.
All of these details are utilized during the join planning phase, and based on the joining keys and table partition compatibility, Spark decides to skip or continue with the Shuffle.
So, if you can add one or more partition columns in the joining keys, this can help skip the entire Exchange
itself. In our case, we can utilize the year column in the join, too, even if we are filtering based on year = 2025
.
Let's implement all the configurations and techniques we discussed above, and rewrite the merge statements.
Optimized Merge
Let's assume in the example above that the organization introduced a new business_vertical
for Data Engineers, a few of the employees moved to this vertical, and the organization hired new Data Engineers.
Setting all the required configurations
Rewriting Merge Statement
Let's rewrite the MERGE statement to include:
- filter to be pushed down
year = 2025
- adding partition column
year
in join for SPJ
Final Results
Let's look into the Spark Physical Plan for the Optimized MERGE Statement.
- All the Expensive operations like
Exchange
andSort
are not present in the plan anymore! - Sort Merge Join is converted to Shuffle Hash Join without Shuffle (thanks to SPJ).
On taking a look at the Scan
Filters:
- Filters added in
ON
clause are Pushed down too.
If you are wondering, what happens Shuffle Hash Map Join is not possible, here's how the MERGE Execution Plan will look like:
Sort Merge Join will be there with Sort
but without any Exchange
.
That's it for this one! 🚀
If you have followed until here, I hope you now know:
- Expensive parts in MERGE Execution Plan.
- How to write filter conditions in MERGE Statement for it to be pushed down.
- How to enable SPJ to get rid of Shuffles while joining.
- Most importantly, you know how to optimize different parts of MERGE Execution Plan without any guess work.
Got any questions? Put it in the comments.
Member discussion