How to Implement Write-Audit-Publish Pattern with Apache Iceberg on AWS using Branches
Detailed implementation of Write-Audit-Publish (WAP) Data Quality Pattern in AWS using Apache Iceberg Braches i.e. for Apache Iceberg > 1.2.0. It also covers the gotchas while using this pattern and using Athena as a query Engine.
This blog will cover how to implement a Write-Audit-Publish (WAP) Data Quality Pattern in AWS using Apache Spark on EMR that uses AWS Glue Catalog as metastore for the Apache Iceberg table and AWS Athena for querying the table.
This blog uses EMR 6.15
, comes with Apache Spark 3.4
and, supports Apache Iceberg version 1.4.0
What is Write-Audit-Publish?
Write-Audit-Publish (WAP) is a data quality pattern commonly used in the data engineering workflow that helps us validate datasets by allowing us to write data to a staged snapshot/branch of the production table and fix any issues before finally committing the data to the production tables.
Pre-requisites for following along
An EMR running with Iceberg Cluster along with Spark 3.4 and Iceberg version > 1.2
The data that I will be using in this blog post is NYC Yellow Taxi Trip Data
When you are creating an EMR Cluster make sure, you Edit Software Setting > Enter Configuration
Preparing Table and Data for WAP
Let's start with setting up all the configurations that we need in SparkSession
for this implementation to work.
In the case of a spark script, you can set these configurations while creating yourSparkSession
usingSparkSession.builder.config("config-key", "config-value")
orspark.conf.set("config-key", "config-value")
Understanding Spark Configurations for Iceberg
spark.sql.extensions
- For loading Iceberg's SQL Extensions module. This module lets you execute variousSpark Procedures
and additionalALTER
statements forAdd/Drop/Replace Partitions
,Set Write Order, Write Distribution
and a few other things that are not possible without these extensions. We will be using this to call a Spark Procedure in WAPspark.sql.catalog.glue
- Defines a catalog name. In this caseglue
is acatalog-name
and the valueorg.apache.iceberg.spark.SparkCatalog
mentions an Iceberg Catalog for Sparkspark.sql.catalog.glue.catalog-impl
- Ascatalog
in Iceberg is an interface and has multiple implementations, this property defines which implementation needs to be used. In our case, it'sGlueCatalog
.spark.sql.catalog.glue.io-impl
- Read/Write implementation for the catalog, as AWS Glue Catalog refers to AWS S3, we will be usingS3FileIO
for reading and writing from S3.spark.sql.catalog.glue.warehouse
- This is the base path in the S3 location where all the data and metadata will be stored by default. This is mainly overwritten during runtime, in case you have an AWS Glue Catalog Database that has a pre-defined S3 location or a pre-defined tableLocation
present in it.
In case you are writing a new Iceberg table without mentioning theLocation
, this table data will be written into a location defined in this property.
Let's create an Iceberg Table that we will consider as a main Production Table on which we will implement WAP to write some new data from the Source Table.
Athena only supports Iceberg Table v2 tables. This is something to keep in mind if you are using Athena as query engine.
This is where we can see the last point that I mentioned in understanding configuration aboutwarehouse
path mentioned in SparkSession being overwritten by already present Glue Catalog Database, in this casenyc_tlc
Let's prepare data that will be written using WAP in yellow_taxi_trips
table.
Alrighty everything is set to start with the WAP implementation now i.e.
- Production Table:
glue.nyc_tlc.yellow_taxi_trips
- Source Data: data present in
source_df
WAP Implementation
Further steps show how to implement different stages in WAP i.e. Write - Audit - Publish with a bit of explanation.
Write
This stage ensures that we can safely write data into a branch created from the prod table so it's in isolation from the current production workloads.
- Create a Branch from the Table and set
write.wap.enabled=true
in table properties for letting Iceberg table handle write using WAP Pattern
Oncewrite.wap.enabled
property is added in the table, Athena doesn't let you see the DDL Statement and throws an error sayingtable has unsupported property
but you can still query the data.
Let's verify this property is added in table properties via Athena Console by running show create table nyc_tlc.yellow_taxi_trips
Whoops..!!! Unable to see the table DDL from Athena. Let's see if we can query the data in this table or that's broken too..!!
It's important to UNSET
the property from the production table to make the table DDL Queryable again.
Let's verify table DDL via Spark directly
# Check table DDL
spark.sql(f"show create table {prod_table}").collect()[0]['createtab_stmt']
- Setting
spark.wap.branch
in Spark Session
- Writing Source Data into Production Table
Let's verify the data in the prod table from Athena:
select "year", "month", count(*) from nyc_tlc.yellow_taxi_trips group by 1,2
Looks like the Prod table data is intact. Let's check the same from spark.
Yikes..!!!! the data in Spark seems different for the same query than in Athena. What the heck is happening? 🤯
The reason for this is:spark.wap.branch
is set toaudit_branch
. The actual branch that is being queried isaudit_branch
and notmain
branch.
To query the main
branch:
This verifies all the writes happening from the current SparkSession are in isolation from the Production Data in yellow_taxi_trips
table.
Audit
This stage enables us to understand the quality of our newly inserted data. During this stage, we can run various data quality checks such as inspecting NULL values, identifying duplicated data, etc.
You can define any of the DQ Processes here via some code, third-party provider, or any other service. A few of such options are:
- via Pydeequ
- via AWS Glue Data Quality
- via Pyspark/Pandas etc
I will be using the third option here via Pyspark for easy understanding.
Let's assume as per our application data quality standard, prod data shouldn't have any data with total_amount
as negative.
Now based on whether your DQ Checks are successful or failed, you can discuss with stakeholders and decide what needs to be done in this case like:
- Do not publish the data as the DQ doesn't meet the expectation.
- DELETE the rows not meeting DQ standards and preserve such records somewhere else.
- Fix the data via some logic like populating null
and missing
value with some logic.
Running DQ Checks on audit_branch
data
Based on the DQ Result, decide what needs to be done. In case the data doesn't need to be published anymore, just drop the branch.
If we can just discard the record and preserve them somewhere to be looked at later.
Publish
This stage allows data to be made available to the production tables for consumption by the downstream analytical applications.
Once the Auditing is done and DQ is as expected or fixed. We can Publish
these final records in the Production main
branch by calling an Iceberg Store Procedure called glue.system.fast_forward
.
Let's verify if the data is present for querying in the table via Athena
Unsetting Configuration/Property for WAP
Unsetting spark.wap.branch
: Will let SparkSesion query the main
branch, write.wap.enabled
from the table will make Athena show DDL again for the table.
Now, we can also drop the audit_branch
as data is published and if you will look into the refs
metadata table for the yellow_taxi_trips
table, it will be pointing to the same snapshot_ids
.
That's it for this one folks..!!!
If you have read until here, please leave a comment below, and any feedback is highly appreciated. See you in the next post..!!! 😊
If it has added any value to you, consider subscribing, it's free of cost and you will get the upcoming posts into your mailbox too.