The general idea and main takeaways for you from this newsletter issue are NOT how I built a logger, but to give you an insight on:
Need for such a solution,
Design decisions made, and
A working logger you can modify easily as needed, or even better, build your own logger.
π‘
The code we are building can be found on GitHub here.
In addition, to make it more than a logger dev newsletter issue, we will build a simple project to test it using Spark Standalone Cluster, MinIO Object Storage, and DuckDB.
Let's get right into it!!
What's the need for such a Logger?
The major issues currently when debugging Spark Applications, especially in production, are:
Delays during the log aggregation
Multiple-step process to fetch the logs
Reviewing many log lines for minor issues like missing tables or columns or some missing variable causing syntax issues in your SQL, you get the idea.
What if you can get the errors every time an application fails by querying a table or logging location?
This will save time, get your production jobs back up and running after failures, and avoid looking into logs until it's unclear from the logged Exception. In addition, this will enable Spark Application audits by identifying failures, performance issues (like Job time increase after some code release), etc.
Easy to Integrate β Easier initialization so integrating it in already existing jobs are not painful or doesn't requires a regression testing.
No Impact on Job Timings β It shouldn't impact the job timings. To make sure there are no performance degradation, it shouldn't involve any heavy lifting or operations, for e.g., taking the count of the output table at the same time. Main function should be just writing the logs.
Lightweight β No additional package or dependency that needs to be build for it's deployment or working.
Provided that most of your jobs are written in functional programming way, creating a Decorator class @SparkExceptionLogger(...) makes more sense as it fulfills the 'Easy to Integrate' point as it will require just 1 line addition. Well 2 if you include import statement too.
What details should it Log?
Based on what we have discussed so far, the most important thing it should log is:
Start and End time β columns: start_ts and end_ts
Execution Date β column: execution_dt
In addition to this, while we are logging all these things, we can add:
Time taken by each job β column: time_taken
A Process or Sub Process name to get the idea of which process is impacted β column: process_name and sub_process
Spark Cluster ID, in case that changes daily for you. β column: cluster_id
Alright, Let's Build now.
Building the Logger
If you are new to Python Decorators, you can give it a read here.
Decorator Class Initialization __init__
Defining the Decorator Call method __call__
__call__ is the method that is called when the decorator is used. This is the method that wraps the actual method function call.
This is the place where we will fetch all the Exception details in case the method is failed and on successful run will just populate some column columns like end_ts , calculate time_taken etc.
Helper methods
Combining all of these together creates our SparkExceptionLogger Decorator Class. You can find the final code here.
Let's build the project I mentioned at the beginning.
A Fun Baby Project
The general idea of this project is to have 3 components β Compute, Storage and a Query Engine.
In AWS, this could be: EMR/Glue, S3, and Athena For Local, I chose: Spark, MinIO and DuckDB
In this project as mentioned in Architecture above, we will be using:
Standalone Spark Cluster running on Docker β Compute Engine for our jobs.
MinIO running on Docker β MinIO is an Open Source Object Storage with APIs like AWS S3.
DuckDB β an in-process SQL OLAP database, I chose DuckDB here because of 2 reasons:
It's ability to query the files directly from S3 buckets and it's easy to get started without any hassle of additional management.
It's one of the most popular database right now, and I want you to try it yourself once and you will see why.
I won't be going into the details of building a Docker image for Spark and MinIO in here (maybe an another blog if you are interested, let me know!!).
To keep it easier for you I have already created a GitHub repo that you can just clone and follow the instructions in README.md or the one I am going to go through.
DuckDB can be installed with just 1-2 lines. You can get it from here.
Starting everything up and running
Once you have everything in place, let's start with building and running all the Docker containers:
Navigate to the cloned repo directory and build the Docker images
Downloading all the required jars to interact with MinIO
Running the Docker containers
Validating everything is up and running
Once everything is up and running, you will be able to see:
Spark master running at localhost:9090
MinIO UI running at localhost:9001 Login using β Username: admin Password: password Make sure you have the warehouse bucket available with Read/Write Access, if not, you can create the same or another bucket.
Place the SparkExceptionLogger.py in spark-minio-project/spark_apps folder.
Creating a PySpark File with Logger integrated
Create a test script in spark-minio-project/spark_apps folder.
Submitting the Spark Job
To submit the Spark Job on running Spark Standalone Cluster, let's run a spark-submit on spark-master container:
If you notice the logs, you will have [TABLE_OR_VIEW_NOT_FOUND] Exception, just to make sure nothing else is breaking.
Querying the Logs from DuckDB
Time to start the quacking!! π₯
Start DuckDB using command-line on a terminal
Creating a SECRET to read from MinIO Bucket (similar to reading as from S3 Bucket)
Querying Logs from Log Location from Bucket
If you notice that we didn't use SELECT * while querying the table, that's because it's not mandatory when you want to see all the columns. Cool right ?!?
Just to see error or error_desc column, we can rewrite it like:
If you notice in the output we can see the exact Exception in error and Stacktrace is stored in error_desc column.
A bit more on DuckDB
You can also change the way data is displayed on your duckdb cli by setting .mode before querying the data and you can create a file from the query output by using .ouput or .once.
This will create a json file containing JSON a JSON array with each record as JSON object.
Shutting Down All the Docker Containers
To shut down all the containers
If you have followed along until here. Congratulations!!! You have:
Figured out how to create your own logger.
Deployed Spark and MinIO Docker containers that you can use as a playground for testing out various other concepts.
Now know how to connect your DuckDB to query the S3 files directly without creating any table.
That's it for this one! π If you liked it, Leave a π on the repo,
Member discussion