11 min read

Optimizing Iceberg MERGE Statements

How can you eliminate shuffling, sorting, and push-down filters to optimize the Apache Iceberg merge statements?
Optimizing Iceberg MERGE Statements
Created by Author

If you are using Apache Iceberg Tables and data is being shuffled, you might be doing something wrong!

You might think that's a very contradictory statement, and the safer way to say this can be with "It depends." – that's what I felt when I heard it for the first time. But if you stick to the end of this post, you will either figure out what you are doing wrong or learn something new.

Image from Tenor

Alrighty, then, let's get into it.

We will take a very simple example for easier understanding and implement a MERGE statement and will further improve on it.

💡
All the code used can be found in the GitHub Repo here.

Setting up an Example

Let's assume we have a department data of an organization that has details of employee currently working in which department for a particular year.

So just 3 columns and one technical column:

  • id – Employee Id of an employee
  • business_vertical – Name of the Department
  • year – Year of working
  • is_updated – represents if employee has changed department

All this data will be stored in a Target Table.

In case employees are moved around different verticals, the business_vertical across their name also needs updation. All such employees data is sent into a different table, we will call this Staged Table

Initializing SparkSession with Iceberg

from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, lit, array, rand, when, col

# Creating Spark Session
# Spark 3.5 with Iceberg 1.5.0

DW_PATH='/iceberg/spark/warehouse'
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("iceberg-poc") \
    .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0')\
    .config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\
    .config('spark.sql.catalog.local','org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.local.type','hadoop') \
    .config('spark.sql.catalog.local.warehouse',DW_PATH) \
    .getOrCreate()

SparkSession Initialization

Generating and Populating Data into Table

We will be generating data for 3 years 2023 to 2025, along with id and business_vertical.

t1 = spark.range(30000).withColumn("year", 
                                when(col("id") <= 10000, lit(2023))\
                                .when(col("id").between(10001, 15000), lit(2024))\
                                .otherwise(lit(2025))
                                )
t1 = t1.withColumn("business_vertical", array(
        lit("Retail"), 
        lit("SME"), 
        lit("Cor"), 
        lit("Analytics")
        ).getItem((rand()*4).cast("int")))\
        .withColumn("is_updated", lit(False))

Data Generation

We will create a Merge-On-Read Table here to perform faster updates. You can read a detailed newsletter mentioned below if you are unsure about MOR.

Copy-on-Write or Merge-on-Read? What, When, and How?
Copy-on-Write or Merge-on-Read? Optimizing Row-level updates in Apache Iceberg Table by understanding both the approaches and deciding when to use which approach and its impact on the Read and Write speed of the table. How to identify these using Iceberg Metadata tables on AWS?

Optimizing Row-level Updates

# Creating a table to store employee's business vertical info
TGT_TBL = "local.db.emp_bv_details"

t1.coalesce(1).writeTo(TGT_TBL).partitionedBy('year').using('iceberg')\
    .tableProperty('format-version','2')\
    .tableProperty('write.delete.mode','merge-on-read')\
    .tableProperty('write.update.mode','merge-on-read')\
    .tableProperty('write.merge.mode','merge-on-read')\
    .create()

Creating a Merge-On-Read Table: emp_bv_details

Updated Employee's Business Vertical Data

Let's assume 3000 employees have changed their departments to Sales in 2025.

# New department created called Sales and 3000 employees switched in 2025
updated_records = spark.range(15000, 18001).withColumn("year", lit(2025)).withColumn("business_vertical", lit("Sales"))

STG_TBL = "local.db.emp_bv_updates"

updated_records.coalesce(1).writeTo(STG_TBL).partitionedBy('year').using('iceberg')\
    .tableProperty('format-version','2')\
    .create()

Creating STAGE TABLE with employee update records

Merge Staging Data into Target Table

Now as we have all the data ready, let's merge this staging data with the data present in target table.

spark.sql(f"""
  MERGE INTO {TGT_TBL} as tgt
  USING (Select *, False as is_updated from {STG_TBL}) as src
  ON tgt.id = src.id
  WHEN MATCHED AND src.business_vertical <> tgt.business_vertical AND tgt.year = src.year THEN
      UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
  WHEN NOT MATCHED THEN
      INSERT *
""")

MERGE statement to merge Staging Data into Target Table

💡
The SparkUI SQL Screenshot can be seen here.
Merge Query Physical Plan. Expensive Operations are highlighted in RED.

Merge Execution Plan

To understand what's happening under the hood of the MERGE statement, let's look into 3 sections of the Merge Execution Plan and the expensive parts within them.

Merge Execution Plan. Expensive Operations in red
Read
  • Scanning the source and target tables with the applied runtime filters.
  • Expensive Operations: Scans without pushdown filters get unnecessary data into the memory.
Merge Rows
  • Joining both the tables to compute the new state of the table by applying WHEN MATCHED and WHEN NOT MATCHED clauses together to merge the records and create a new state of records
  • Expensive Operations: Shuffling and Sorting due to the Sort-Merge Join.
Write
  • After merging the records, it's time to write them back into the target table.
  • Expensive Operations: Pre-shuffling data before writing to align with the table partitioning scheme. (Optional) Pre-sorting of data before writing in case table has sort keys defined.
Exchange, a.k.a. Shuffling operations, will dominate the runtime of the MERGE Operation and will be, by far, the most expensive part of it.

Now that you know all the Expensive parts in all the sections, let's look into the Optimization techniques we can use to avoid or reduce these expensive operations.

Optimizing MERGE Statement

In this section we will be looking into the previous Merge Statement, identify the potential areas to improve it's performance.

Push Down Filters

This is a very common thing to miss out on while writing a MERGE Statement.

I mentioned missing out because you might still be using the required filter condition, but it might not get pushed down while reading from the table.

Let's take a look at previous MERGE Statement Scan Filters:

BatchScan Plan with NO Pushdown Filters

To push-down any filters it needs to be present in the ON clause of MERGE statement. Any filters present at WHEN MATCHED or WHEN NOT MATCHED are NOT pushed down.

-- Adding filter condition that will be pushed down
MERGE INTO TARGET_TABLE tgt
USING STAGE_TABLE stg
ON tgt.id = stg.id 
  AND tgt.year = 2025 -- All filters in ON Clause are pushed down.
WHEN MATCHED ...
WHEN NOT MATCHED ...

ON Clause filters that will be pushed down

MERGE Plan after applying filters in ON Clause

Sort-Merge Joins to Hash Joins (if possible)

Spark prefers Sort Merge Join by default because it's more scalable than Hash Join.

This behavior is controlled by Spark SQL Configuration spark.sql.join.preferSortMergeJoin which is set to true by default.

To overwrite this behavior, set spark.sql.join.preferSortMergeJoin to false. In cases where the build side is small enough to build a Hash Map, a Shuffle Hash Join will be preferred over Sort-Merge Join.

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

Setting preferSortMergeJoin to False

MERGE Execution Plan after preferSortMergeJoin is set to False

If you will notice the Sort that were caused by Sort Merge Join are eliminated.

Spark Fanout Writers

Local sorting happening before writing data can be avoided by enabling spark fan-out writers for the table.

This local sort is fairly cheaper in batch workloads compared to streaming workloads where it adds additional latency and can be avoided by using Fanout Writers.

# Setting Table Property to enable fanout-writers
spark.sql(f"""ALTER TABLE {TGT_TBL} SET TBLPROPERTIES (
    'write.spark.fanout.enabled'='true'
)""")

Enabling fanout Writers

Fanout writer opens the files per partition value and doesn't close these files till the write task finishes.

Per my testing with fanout writers for batch workloads, it doesn't work for the hadoop catalog but seems to work with the AWS glue catalog.
MERGE Execution Plan after fanout writers are enabled

Iceberg Distribution Modes

Distribution modes in Iceberg decides "How data is being distributed by IcebergWriter before it's being written into the Iceberg Table."

Iceberg provides 3 distribution modes:

  • None – Doesn't request for any shuffle or sort to be performed by Spark before writing data. Least expensive of all.
  • hash – Requests data to be shuffled using hash based exchange by Spark before writing.
  • range – Requests that Spark perform a range based exchange to shuffle the data before writing. Most expensive of all.

The Exchange node (Pre-shuffling before Write) present in write section of Merge Plan before writing data into table is because of the default distribution mode, i.e. hash.

This Exchange node can simply be eliminated by changing the distribution mode to None. This will accelerate the write process, BUT might end up in creating lots of small files.

# Setting Distribution mode in SparkSession.
# This overwrites the Distribution mode mentioned in TBLPROPERTIES
spark.conf.set("spark.sql.iceberg.distribution-mode", "None")

Setting Distribution-Mode as None in SparkSession

MERGE Execution Plan after Distribution Mode is set to None
Trade-offs/Balancing between Write and Read Performance
Eliminating Exchange before writing into Table is a trade-off decision between Write Performance and Read Performance.

Using None as a distribution mode can amplify the Write Performance but might degrade the Read Performance because of the creation of many small files.

Read Performance degradation can even be worse when you use MOR tables because while reading, the query engine will have to read many small files and the extra work it has to do to combine the records from the Delete file.

I mentioned balancing here because if you need both Write and Read performance, mitigating degradation in Read Performance requires frequent Compaction runs.

Does this mean we should never use None Distribution Mode?

You can still use None Distribution Mode, if you are:

  • Writing data only in one partition, like in this case year=2025.
  • Running Compaction frequently – To mitigate Read Performance Degradation, frequent Compaction runs is required.
  • Write Performance is more important.
Main idea for this much details is to know what you are doing when you are making this decision.

Note: I am not covering how to mitigate the small file problem in this post as that is entirely a separate problem in itself.

Enabling Storage Partitioned Join

Storage Partitioned Join (SPJ) is a Join Optimization Technique that helps you avoid the expensive Shuffles while performing the joins.

You can read all about it 👇

Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.

Deep Dive into Storage Partitioned Join

To enable SPJ, there are 2 main things needs to be done:

Setting up Configurations for SPJ
# Enabling SPJ
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')

Setting Spark Configuration for SPJ

Including Partition Columns in the ON clause of MERGE Statement

On a very high level, SPJ works on by understanding how the data is physically laid out in the table.

All of these details are utilized during the join planning phase, and based on the joining keys and table partition compatibility, Spark decides to skip or continue with the Shuffle.

So, if you can add one or more partition columns in the joining keys, this can help skip the entire Exchange itself. In our case, we can utilize the year column in the join, too, even if we are filtering based on year = 2025.

MERGE INTO TARGET_TABLE tgt
USING STAGE_TABLE stg
ON tgt.id = stg.id 
  AND tgt.year = 2025
  AND tgt.year = src.year -- joining on partition columns to enable SPJ
WHEN MATCHED ...
WHEN NOT MATCHED ...

Partition Column added in ON clause to enable SPJ

MERGE Execution Plan after StoragePartitioned Join is enabled.

Let's implement all the configurations and techniques we discussed above, and rewrite the merge statements.

Optimized Merge

Let's assume in the example above that the organization introduced a new business_vertical for Data Engineers, a few of the employees moved to this vertical, and the organization hired new Data Engineers.

# Updating some employees departments after setting some configs for optimization
updated_records = spark.range(28000, 35001)\
                .withColumn("year", lit(2025))\
                .withColumn("business_vertical", lit("DataEngineering"))
                
updated_records.coalesce(1).writeTo(STG_TBL).overwritePartitions()

Updated data for business_vertical

Setting all the required configurations

# To avoid sort due to Sort Merge Join by prefering Hash Join if possible.
spark.conf.set('spark.sql.join.preferSortMergeJoin', 'false')

# To avoid Shuffle before writing into table.
spark.conf.set("spark.sql.iceberg.distribution-mode", "None")

# Enabling SPJ
spark.conf.set('spark.sql.sources.v2.bucketing.enabled','true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled','true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping','true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition','false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled','true')

Setting configurations

# To avoid Pre-Sorting before writing into table
spark.sql(f"""ALTER TABLE {TGT_TBL} SET TBLPROPERTIES (
    'write.spark.fanout.enabled'='true'
)""")

Enabling fanout writers for the TARGET TABLE

Rewriting Merge Statement

Let's rewrite the MERGE statement to include:

  • filter to be pushed down year = 2025
  • adding partition column year in join for SPJ
spark.sql(f"""
MERGE INTO {TGT_TBL} as tgt
USING (SELECT *, False as is_updated from {STG_TBL}) as src
ON tgt.id = src.id AND tgt.year = src.year AND tgt.year = 2025
WHEN MATCHED AND 
    tgt.business_vertical <> src.business_vertical 
    THEN
    UPDATE SET tgt.is_updated = True, tgt.business_vertical = src.business_vertical
WHEN NOT MATCHED THEN
    INSERT *
""")

Updated MERGE Statement to include filter and partition column in Join.

Changes highlighted compared to the previous MERGE statement
💡
The SparkUI SQL Screenshot can be seen here.

Final Results

Let's look into the Spark Physical Plan for the Optimized MERGE Statement.

Optimized MERGE statement Physical Execution Plan
  • All the Expensive operations like Exchange and Sort are not present in the plan anymore!
  • Sort Merge Join is converted to Shuffle Hash Join without Shuffle (thanks to SPJ).

On taking a look at the Scan Filters:

  • Filters added in ON clause are Pushed down too.
Filters present in ON clause are Pushed down
From Tenor

If you are wondering, what happens Shuffle Hash Map Join is not possible, here's how the MERGE Execution Plan will look like:

MERGE Execution Plan when Shuffle Hash Join is not possible

Sort Merge Join will be there with Sort but without any Exchange.


That's it for this one! 🚀
If you have followed until here, I hope you now know:

  • Expensive parts in MERGE Execution Plan.
  • How to write filter conditions in MERGE Statement for it to be pushed down.
  • How to enable SPJ to get rid of Shuffles while joining.
  • Most importantly, you know how to optimize different parts of MERGE Execution Plan without any guess work.

Got any questions? Put it in the comments.