Write-Audit-Publish Pattern with Apache Iceberg on AWS using Branches
Detailed implementation of Write-Audit-Publish (WAP) Data Quality Pattern in AWS using Apache Iceberg Braches i.e. for Apache Iceberg > 1.2.0. It also covers the gotchas while using this pattern and using Athena as a query Engine.
This blog will cover how to implement a Write-Audit-Publish (WAP) Data Quality Pattern in AWS using Apache Spark on EMR that uses AWS Glue Catalog as metastore for the Apache Iceberg table and AWS Athena for querying the table.
Branching is an Iceberg table feature that was introduced in version > 1.2.0
This blog uses EMR 6.15, comes with Apache Spark 3.4and, supports Apache Iceberg version 1.4.0
What is Write-Audit-Publish?
Write-Audit-Publish (WAP) is a data quality pattern commonly used in the data engineering workflow that helps us validate datasets by allowing us to write data to a staged snapshot/branch of the production table and fix any issues before finally committing the data to the production tables.
When you are creating an EMR Cluster make sure, you Edit Software Setting > Enter Configuration
Preparing Table and Data for WAP
Let's start with setting up all the configurations that we need in SparkSession for this implementation to work.
In the case of a spark script, you can set these configurations while creating your SparkSession using SparkSession.builder.config("config-key", "config-value") or spark.conf.set("config-key", "config-value")
Understanding Spark Configurations for Iceberg
spark.sql.extensions - For loading Iceberg's SQL Extensions module. This module lets you execute various Spark Procedures and additional ALTER statements for Add/Drop/Replace Partitions , Set Write Order, Write Distribution and a few other things that are not possible without these extensions. We will be using this to call a Spark Procedure in WAP
spark.sql.catalog.glue - Defines a catalog name. In this case glue is a catalog-name and the value org.apache.iceberg.spark.SparkCatalog mentions an Iceberg Catalog for Spark
spark.sql.catalog.glue.catalog-impl - As catalog in Iceberg is an interface and has multiple implementations, this property defines which implementation needs to be used. In our case, it's GlueCatalog .
spark.sql.catalog.glue.io-impl - Read/Write implementation for the catalog, as AWS Glue Catalog refers to AWS S3, we will be using S3FileIO for reading and writing from S3.
spark.sql.catalog.glue.warehouse - This is the base path in the S3 location where all the data and metadata will be stored by default. This is mainly overwritten during runtime, in case you have an AWS Glue Catalog Database that has a pre-defined S3 location or a pre-defined table Location present in it. In case you are writing a new Iceberg table without mentioning the Location, this table data will be written into a location defined in this property.
Let's create an Iceberg Table that we will consider as a main Production Table on which we will implement WAP to write some new data from the Source Table.
Athena only supports Iceberg Table v2 tables. This is something to keep in mind if you are using Athena as query engine.
This is where we can see the last point that I mentioned in understanding configuration about warehouse path mentioned in SparkSession being overwritten by already present Glue Catalog Database, in this case nyc_tlc
Let's prepare data that will be written using WAP in yellow_taxi_trips table.
Alrighty everything is set to start with the WAP implementation now i.e.
Production Table: glue.nyc_tlc.yellow_taxi_trips
Source Data: data present in source_df
WAP Implementation
Further steps show how to implement different stages in WAP i.e. Write - Audit - Publish with a bit of explanation.
This stage ensures that we can safely write data into a branch created from the prod table so it's in isolation from the current production workloads.
Create a Branch from the Table and set write.wap.enabled=true in table properties for letting Iceberg table handle write using WAP Pattern
Once write.wap.enabled property is added in the table, Athena doesn't let you see the DDL Statement and throws an error saying table has unsupported property but you can still query the data.
Let's verify this property is added in table properties via Athena Console by running show create table nyc_tlc.yellow_taxi_trips
Whoops..!!! Unable to see the table DDL from Athena. Let's see if we can query the data in this table or that's broken too..!!
It's important to UNSET the property from the production table to make the table DDL Queryable again.
Let's verify the data in the prod table from Athena:
select "year", "month", count(*) from nyc_tlc.yellow_taxi_trips group by 1,2
Looks like the Prod table data is intact. Let's check the same from spark.
Yikes..!!!! the data in Spark seems different for the same query than in Athena. What the heck is happening? 🤯
The reason for this is: spark.wap.branch is set to audit_branch . The actual branch that is being queried is audit_branch and not main branch.
To query the main branch:
This verifies all the writes happening from the current SparkSession are in isolation from the Production Data in yellow_taxi_trips table.
This stage enables us to understand the quality of our newly inserted data. During this stage, we can run various data quality checks such as inspecting NULL values, identifying duplicated data, etc.
You can define any of the DQ Processes here via some code, third-party provider, or any other service. A few of such options are:
I will be using the third option here via Pyspark for easy understanding. Let's assume as per our application data quality standard, prod data shouldn't have any data with total_amount as negative.
Now based on whether your DQ Checks are successful or failed, you can discuss with stakeholders and decide what needs to be done in this case like: - Do not publish the data as the DQ doesn't meet the expectation. - DELETE the rows not meeting DQ standards and preserve such records somewhere else. - Fix the data via some logic like populating null and missing value with some logic.
Running DQ Checks on audit_branch data
Based on the DQ Result, decide what needs to be done. In case the data doesn't need to be published anymore, just drop the branch.
If we can just discard the record and preserve them somewhere to be looked at later.
This stage allows data to be made available to the production tables for consumption by the downstream analytical applications.
Once the Auditing is done and DQ is as expected or fixed. We can Publish these final records in the Production main branch by calling an Iceberg Store Procedure called glue.system.fast_forward.
Let's verify if the data is present for querying in the table via Athena
Unsetting Configuration/Property for WAP
Unsetting spark.wap.branch : Will let SparkSesion query the main branch, write.wap.enabled from the table will make Athena show DDL again for the table.
Now, we can also drop the audit_branch as data is published and if you will look into the refs metadata table for the yellow_taxi_trips table, it will be pointing to the same snapshot_ids.
That's it for this one folks..!!!
If you have read until here, please leave a comment below, and any feedback is highly appreciated. See you in the next post..!!! 😊
If it has added any value to you, consider subscribing, it's free of cost and you will get the upcoming posts into your mailbox too.
Sign up for Akashdeep Gupta
Big Data and Serverless Tech implementation and tutorials on cloud
No spam. Unsubscribe anytime.
You might also like...
Selecting between Double and Decimal Data Type To Avoid Unexpected Results
How to choose between Double and Decimal data types for your tables/datasets, why does it matter, and when to choose which one?
4 min read
Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.
10 min read
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
7 min read
EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
30 min read
Copy-on-Write or Merge-on-Read? What, When, and How?
Copy-on-Write or Merge-on-Read? Optimizing Row-level updates in Apache Iceberg Table by understanding both the approaches and deciding when to use which approach and its impact on the Read and Write speed of the table.
How to identify these using Iceberg Metadata tables on AWS?
Member discussion