This blog post will cover the different components of PyDeequ and how to use PyDeequ to test data quality in depth.
💡
All the code present in this post is present on my GitHub here.
⚠️
Currently (Dec'23), PyDeequ isn't compatible with Spark version > 3.3, but the community is working on it. All the details around it can be seen here.
What is PyDeequ?
PyDeequ is an open-source Python wrapper around Deequ (an open-source tool developed and used in Amazon). Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. It's built on top of Apache Spark so it can scale with large datasets (billions of rows).
Why testing Data Quality is important?
The quality of data within a system can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. This bad data quality in the system can result in failures in Production, unexpected output from ML models, wrong business decisions, and much more.
Environment and Data Preparation:
You can run this on your local machine too, if you have an Apache Spark setup in your workstation. I will be using EMR 6.11 that comes with Spark 3.3
The data that I am using for this is NYC Yellow Taxi Trip record data. It can be accessed here along with the Data Dictionary if you are interested in understanding the data a bit.
Installing Pydeequ in EMR
Setting up SparkSession Configuration in Notebook:
Before you can import pydeequ, we need to set SPARK_VERSION in environment variables as pydeequ tries to get it from environment variables.
Reading data from S3:
PyDeequ components:
Let's look into the various components of PyDeequ via code examples to make more sense of which component can be used and when.
Metrics Computation
Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation.
This can mainly be used for Data Analysis. Deequ provides Profilers and Analyzers to do that.
Profilers
Let's say you don't know much about the data like what is the completeness level of columns, which column has nulls, non-negative values, uniqueness, distinctness, etc. In this case, we can just run a profiler on the entire data and Deequ will provide us with all the stats for these.
In case, you are just interested in a few columns in the data, let's say VendorID, trip_distance and total_amount are the columns of interest. In this case, ColumnProfiler can be restricted only to these methods.
Based on the profiler output, you can effectively decide what all columns are interest to you and if that column actually needs some cleansing or transformations.
Analyzers
Analyzers can be used to get some specific metrics on columns of interest. PyDeequ supports a rich set of metrics. AnalysisRunner can be used to capture the metrics we are interested in.
%%display
analysisResult_df
An important thing to notice here is that you can run some of the Analyzers on multiple columns too like in above example Uniqueness(["VendorID", "payment_type"] to see the uniqueness for the combination of VendorID and payment_type columns.
A list of all the available Analyzers in PyDeequ can be seen here.
Constraint Suggestion
You can think of constraints as the Data Quality Rules that your data should follow to be called good data. PyDeequ can help you by suggesting the constraints that you can apply to the columns by running a profiler on the data. Also, it provides the details of how a particular constraint can be applied during the verification
If you are interested in knowing if a particular constraint is applicable for columns are not, that can be checked by adding particular constraint rules instead of DEFAULT()
Supported constraint suggestions can be seen here.
Constraint Verification
As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification. This is where you define the data quality test cases to verify some specific constraints on columns.
Let's add a column called gen_id in dataframe just to see how the rules around uniqueness work.
Let's create some DQ constraints that can be verified by PyDeequ.
DQ Constraints for Verification are defined as Check that can further be added into VerificationSuite.
%%display
checkResult_df
If you notice here, for Airport_fee I have applied a particular assertion mentioning that the completeness of this column should be >= 0.95 . I have got this constraint from the ConstraintSuggestion, that we ran previously. If you notice the output it's mentioned like this:
{
"constraint_name": "CompletenessConstraint(Completeness(Airport_fee,None))",
"column_name": "Airport_fee",
"current_value": "Completeness: 0.9560146325467701",
"description": "'Airport_fee' has less than 5% missing values",
"suggesting_rule": "RetainCompletenessRule()",
"rule_description": "If a column is incomplete in the sample, we model its completeness as a binomial variable, estimate a confidence interval and use this to define a lower bound for the completeness",
"code_for_constraint": ".hasCompleteness(\"Airport_fee\", lambda x: x >= 0.95, \"It should be above 0.95!\")"
}
code_for_constraint key has the code that I have used in the checks and this is one example, of how you can use constraint suggestions during Constraint Verification
Better Ways to Define Checks for Constraint Verification
Checks can also be separated or grouped based on the DQ logic so the verification results can make more sense.
Instead of defining all the column checks in one Check, it can be split into multiple checks.
For each Check, you can define different CheckLevel, depending upon how strict of a DQ check it is.
These are specifically useful when you want to identify easily what check has failed and you can automate a solution to fix this if required. For example, if fare_amount checks are failed, as per your business logic, you can filter out all the outliers and write them into a different table.
Check the example below for this.
All the available checks in PyDeequ can be seen here.
Metrics Repositories
Now this is one of my favourite components. PyDeequ gives the ability to store and load the metrics from a repository. We can create a repository that can be mentioned during data analysis or running constraint verifications. The data stored in the repository can be used for checking the results of different Analyzer or Constraint Verification runs. After all, what's the use of the metrics if you can't look and compare them at later point of time.😄
Let's create a repository and see how these can be integrated to store the metrics:
Each set of metrics that we computed needs to be indexed by a so-called ResultKey, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.
Based on tags, you can store metrics of different runs within the same repository e.g. Verification Results on the different month's data (like nyc_yellow_repository can hold the Verification for metrics for Sep 2023, Oct 2023 and so on).
Using Repositories with AnalysisRunner and Constraint Verification
Reading metrics from Metrics Repository
Filtering Repository Results based on Tag
You can use multiple filters to filter the repo based on tag and timestamp. We will be using filter results by tag here.
An important thing to know here: Metrics Repository only stores the metrics numbers along with the column and constraint details and NOT the Verification Results in case of Constraint Verification
So if you need to store the Verification Results, you can write those directly in a S3 Location or create an Athena table on top of it. Here I will be creating an Athena table that will be used to store the Verification Results.
Querying in Athena
select * from nyc_tlc.pydeequ_verification_results
Reading metrics file from S3
As you can store your metrics.json file in S3 for later use, you can use the same file to create a FileSystemMetricsRepository
If you have read until here, please leave a comment below, and any feedback is highly appreciated. See you in the next post..!!! 😊
If it has added any value to you, consider subscribing, it's free of cost and you will get the upcoming posts into your mailbox too.
Sign up for Akashdeep Gupta
Big Data and Serverless Tech implementation and tutorials on cloud
No spam. Unsubscribe anytime.
You might also like...
Dec
15
Selecting between Double and Decimal Data Type To Avoid Unexpected Results
How to choose between Double and Decimal data types for your tables/datasets, why does it matter, and when to choose which one?
4 min read
Nov
28
Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.
10 min read
Oct
12
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
7 min read
Feb
13
EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
30 min read
Jan
24
Copy-on-Write or Merge-on-Read? What, When, and How?
Copy-on-Write or Merge-on-Read? Optimizing Row-level updates in Apache Iceberg Table by understanding both the approaches and deciding when to use which approach and its impact on the Read and Write speed of the table.
How to identify these using Iceberg Metadata tables on AWS?
Member discussion