This blog will cover how to implement a Write-Audit-Publish (WAP) Data Quality Pattern in AWS using Apache Spark on EMR that uses AWS Glue Catalog as metastore for the Apache Iceberg table and AWS Athena for querying the table.
This blog uses Apache Iceberg version < 1.2.1 as this version doesn't have a branching feature. To see the implementation via branching, check here.
This blog uses EMR 6.11, comes with Apache Spark 3.3and, supports Apache Iceberg version 1.2.0
What is Write-Audit-Publish?
Write-Audit-Publish (WAP) is a data quality pattern commonly used in the data engineering workflow that helps us validate datasets by allowing us to write data to a staged snapshot/branch of the production table and fix any issues before finally committing the data to the production tables.
Things to know before implementation
On setting write.wap.enabled=true in the Iceberg table property, all the data written in the table is in stage mode only i.e. written data won't be reflected in the table until published.
Iceberg tables write data into snapshots.
Snapshots in an Iceberg table can be seen by querying the snapshot metadata table.
Snapshot metadata table can be queried via <table-name>.snapshots within spark.sql or SELECT * FROM "dbname"."tablename$snapshots" via Athena.
Once write.wap.enabled=true is set on the table, Athena doesn't show the table DDL until it's UNSET again but you can still query the data as usual. More details on this are covered here.
When you are creating an EMR Cluster make sure, you Edit Software Setting > Enter Configuration
Preparing Table and Data for WAP
Let's start with setting up all the configurations that we need in SparkSession for this implementation to work.
In the case of a spark script, you can set these configurations while creating your SparkSession using SparkSession.builder.config("config-key", "config-value") or spark.conf.set("config-key", "config-value")
spark.jars.packages and spark.jars.excludes are not required for WAP implementation with Iceberg. I have added these jars that will be used to demonstrate Auditing with Pydeequ.
To understand all the Iceberg related configurations, check here
Let's create an Iceberg Table that we will consider as a main Production Table on which we will implement WAP to write some new data from the Source Table.
Athena only supports Iceberg Table v2 tables. This is something to keep in mind if you are using Athena as query engine.
Let's prepare data that will be written using WAP in green_taxi_trips table.
Alrighty everything is set to start with the WAP implementation now i.e.
Production Table: glue.nyc_tlc.green_taxi_trips
Source Data: data present in source_df
WAP Implementation
Further steps show how to implement different stages in WAP i.e. Write - Audit - Publish with a bit of explanation.
Write
This stage ensures that we can safely write data into a branch created from the prod table so it's in isolation from the current production workloads.
Set write.wap.enabled=true in table properties for letting Iceberg table handle write using WAP Pattern
Setting wap.id in SparkSession that will be used to identify the staged snapshot
Writing Source Data into Production Table
Let's verify the data in the Production Table
As mentioned before, the data that is written with write.wap.enabled=true is in staging mode. Let's verify the data in the snapshot written in the previous write.
Audit
This stage enables us to understand the quality of our newly inserted data. During this stage, we can run various data quality checks such as inspecting NULL values, identifying duplicated data, etc.
You can define any of the DQ Processes here via some code, third-party provider, or any other service. A few of such options are:
I will be using the first option here via PyDeequ. To learn more about PyDeequ and how it works, you can refer to this blog post. Let's say our DQ requirement is: - Completeness criteria for VendorID should be 1.0 i.e. there are no `null`. - total_amount shouldn't be negative - payment_type should be discrete numbers between 1 and 6.
As we can see, one of the DQ Validations failed as total_amount column has some negative values. Based on this result and discussion with stakeholders, we can decide whether we want to Publish the data to the Production table or not.
Scenario 1: Let's say as per discussion with stakeholders, we don't want to publish the data in Production because DQ Checks have failed.
Scenario 2: Let's say as per discussion with stakeholders, we can clean the data by filtering out all such records that have negative total_amount and write the data with a negative total_amount to some bad data location.
Once we have cleaned the data, we can rewrite this into the table.
Now the important thing to notice here is that as the data is re-written on top of staged snapshot, a new snapshot_idfor this write is generated . This new snapshot_id will be the actual data, that needs to be published in the Production Table.
Let's look into the snapshot metadata table to understand what I mean.
Things to observe in above output:
Now we have 2 snapshot_id (7270839791374367070 and 8254249659682978316) corresponding to the wap.id i.e. fcd54f05ee4b4480a21c191cd5c17f39 written at different times i.e. commited_at
Data that needs to be published in the Production Table should be the cleaned data i.e. of snapshot_id=8254249659682978316
Publish
This stage allows data to be made available to the production tables for consumption by the downstream analytical applications.
Once the Auditing is done and DQ is as expected or fixed. We can Publish the final snapshot in the Production table by calling an Iceberg Store Procedure called system.cherrypick_snapshot.
To publish the snapshot from the above scenario, we need to get the latest snapshot_id based on commited_at timestamp for the wap.id that we used to perform the write.
Let's verify the records in the Production Table now from AWS Athena
Finally, to make the DDL queryable again from Athena, UNSET the table property write.wap.enabled . The reason for doing this is explained in detail in this blog post.
That's it for this one folks..!! Now you know how to implement WAP Pattern on AWS for Iceberg version < 1.2.1
If you have read until here, please leave a comment below, and any feedback is highly appreciated. See you in the next post..!!! 😊
If it has added any value to you and want to read more content like this, subscribe to the newsletter, it's free of cost and I will make sure every post is worth your time.
Sign up for Akashdeep Gupta
Big Data and Serverless Tech implementation and tutorials on cloud
No spam. Unsubscribe anytime.
You might also like...
Dec
15
Selecting between Double and Decimal Data Type To Avoid Unexpected Results
How to choose between Double and Decimal data types for your tables/datasets, why does it matter, and when to choose which one?
4 min read
Nov
28
Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.
10 min read
Oct
12
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
7 min read
Feb
13
EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
30 min read
Jan
24
Copy-on-Write or Merge-on-Read? What, When, and How?
Copy-on-Write or Merge-on-Read? Optimizing Row-level updates in Apache Iceberg Table by understanding both the approaches and deciding when to use which approach and its impact on the Read and Write speed of the table.
How to identify these using Iceberg Metadata tables on AWS?
Member discussion