EMRFS S3 Optimized Committer and Committer Protocol for Improving Spark Write Performance - Why and How?
What are EMRFS S3 Optimized Committer and EMRFS S3 Optimized Committer Protocol and how to use and identify if these are working for your Spark Jobs to improve write performance?
Just writing a Spark job is not sufficient, knowing what it's doing under the hood is a secret sauce to make them run faster.
Knowing how your Spark job is writing the files behind the scenes can help you optimize the overall time the job takes to complete. OutputCommitters are the classes responsible for writing your data files into the storage. Based on which implementation of this class is being used while writing decide if it's committing data files via renaming the directories.
By the end of this blog post, you will be fully equipped with the skills to know if your Spark job is writing efficiently, how to optimize it by enabling EMRFS S3 Optimized Committers, and most importantly, how you can identify if these committers are working after enabling them.
This blog post will cover:
What does Commit Protocol and Committing mean?
What are EMRFS S3-optimized committer and EMRFS S3-optimized commit protocol?
Why do we need EMRFS S3-optimized committer and committer protocol?
How to enable these S3-optimized committers and when to use which one?
How to check if these are working for your job after enabling them?
Let's start with demystify some buzzwords in this space like commit protocol and committing.
What is "the commit protocol"?
It’s the requirement on workers as to when their data is made visible, where, for a filesystem, “visible” means “can be seen in the destination directory for the query.” Also, it ensures that jobs and tasks either succeed or fail cleanly.
What does commit means?
Simply, it just means moving the task output from a temp directory to the specified output directory. When working with distributed filesystems, it’s achieved by renaming the directory.
What is EMRFS?
The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3. EMRFS provides the convenience of storing persistent data in Amazon S3 for use with Hadoop while also providing features like data encryption.
Why do we need additional optimized committers and commit protocols for Object Storage?
In Object Storage like S3, committing data files using rename is horribly slow because renaming requires reading and writing an entire data file with a new name and deleting the previous one while in File Storage like HDFS it's just a metadata operation which is super fast compared to copying the entire data file.
Now as EMR uses EMRFS to read and write from Amazon S3, it provides EMRFS S3-optimized committer and EMRFS S3-optimized commit protocol that improves the write performance by avoiding the renaming operations done in Amazon S3 during Job and Task commit phases.
Before diving deeper into how this EMRF S3-optimized committer and commit protocol improves the write performance, we need to know what happens when a write is issued and what steps are used before the Job is finally committed.
Here's a very high-level overview of the same:
Now the above flow gives a very high-level overview of all the steps involved until the files are finally visible in the destination location. These steps are performed by multiple classes in the backend. We will build on top of it and see how things change within these steps once we use the EMRFS S3-optimized committer and commit protocol.
Internally committer protocols delegate their work to the committer that's being set up during the setupTask stage and which committer will be used depends on the protocol class implementation and configuration that is being passed to this class.
Introducing EMRFS S3-Optimized Committer and Commit Protocol
EMRFS S3-Optimized Committer and Commit Protocol are optimized for improving the write performance of files to Amazon S3 when using EMRFS by avoiding list and rename operations done in S3 during job and task commit phases.
Both of these use the transaction-like characteristics of S3 multipart uploads to avoid any rename operation and ensure files written by task attempts only appear in the job's output location upon the task commit.
Transaction-like characteristics of S3 multipart upload are mainly that the file is only available to read from S3 location only when all the parts of a file have been uploaded to S3.
By using multipart uploads to the job output location on task commit, task commit performance is improved over the default FileOutputCommitter V2 algorithm. This is mainly because in the FileOutputCommiter V2 algorithm during the task commit phase, the renaming of the file from staging directory to job output location is done, which is not required anymore due to the multipart uploads to job output location directly.
All the file listing operations are avoided because instead of listing the files in the job output or staging S3 location it uses HTTPS API calls to S3 URI for checking the files' status and the key counts, we will have a look into this in the later part of this blog.
Alrighty now that we know what causes the write performance to improve when using EMRFS S3-Optimized Committer and Commit Protocol, let's dive into these and see how can we check if these are working for Spark jobs.
EMRFS S3-Optimized Committer
It's an alternative OutputCommitter implementation. An OutputCommitter is an abstract class and ensures that a commit protocol is being used. This S3-Optimized Committer is available starting from EMR 5.19.0 and is enabled by default with EMR 5.20.0 and later.
We will be discussing here for the Parquet Tables but the requirement for other table formats can be seen in the docs.
Spark job uses Spark SQL, DataFrames, or Dataset to write files.
Spark built-in file format support is used i.e. spark.sql.hive.convertMetastoreParquet is set to true . This is by default.
The target table is created using USING parquet clause i.e. mainly that it's a Hive Parquet table or uses Hive Parquet Serde while reading writing from the table.
Spark job operations that write to a default partition location—for example, ${table_location}/k1=v1/k2=v2/
Multipart uploads must be enabled for AWS EMR.
Spark must have configs:
spark.sql.parquet.fs.optimized.committer.optimization-enabled set to true
spark.sql.hive.convertMetastoreParquet set to true
spark.sql.parquet.output.committer.class set to com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.sql.sources.commitProtocolClass set to org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol or org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.
partitionOverwriteMode while overwriting the parquet dataset must be static as write option and spark.sql.sources.partitionOverwriteMode set to static .
Most of the spark configurations mentioned in the Requirements section are enabled by default. If you are not sure, the same can be checked using spark.conf.get("conf-key-name") from a Jupyter notebook running Spark kernel or from spark-shell running in EMR.
One of the most important thing to note here is that this committer work only in case of partitionOverwriteMode as static while writing into an S3 location or if you are loading a table then it works only for non-partitioned Hive metastore Parquet tables.
Now to understand how this write is optimized let's look into the case first where the optimized committer doesn't work. I will be using EMR 6.15 for the demo but you can use other versions starting from EMR 5.19.0
Optimization is disabled – S3-Optimized Commiter DOESN'T work
Default configuration for partitionOverwriteMode is set to STATIC.
Let's look into the log to see what happens under the hood when EMRFS S3-Optimized Committer is not working.
Here's a quick overview of what's happening under the hood when the optimization is disabled:
_temporary/0/_temporary/attempt_202402101142404460199108560845936_0002_m_000000_2 path is created inside the job output location i.e. table path s3://aws-blog-post-bucket/blogs_db/emr_committer_test_non_ptn
Data is written into EMR local and then multipart uploaded into this temporary location.
Sequential renaming of files happens from temp location to job output location which is an expensive operation in an Object Storage like AWS S3.
_SUCCESS file is created in the job output location during the final job is being committed.
FileOutputCommitter is used when optimization is disabled.
Using EMRFS S3-Optimized Committer i.e. Optimization enabled and using SQLHadoopMapReduceCommitProtocol with dynamicOverwrite set to static for a non-partitioned table – Commiter Works
Default configuration for spark.sql.parquet.fs.optimized.committer.optimization-enabled is true starting from EMR 5.20.0
Let's take a peek inside the logs in the case when EMRFS S3-Optimized Committer is working
On looking into the logs we can observe:
There is no _temporary/0.../ directory created inside the job output location i.e. table S3 location location s3://aws-blog-post-bucket/blogs_db/emr_committer_test_non_ptn
Only a staging directory is created i.e. .emrfs_staging_0_attempt_20240207144637830028175290150178_0001_m_000000_1
The data after being written into the EMR local /mnt/s3/emrfs-7576990015875678729/ is directly being uploaded to the table S3 location and _SUCCESS file is created within the same.
There is NO renaming of files at all and that's where the actual difference in write performance can be observed compared to the previous case where the committer was not working.
When the optimization is enabled, the OutputCommitter that is being used is FileSystemOptimizedCommitter instead of FileOutputCommitter .
We have seen so far how write performance can be improved while writing into non-partitioned tables using EMRFS S3-Optimized Committers but what if we want to improve the performance in case of overwriting a partitioned table and that's where the EMRFS S3-Optimized Commit Protocol comes into picture.
EMRFS S3-Optimized Commit Protocol
The EMRFS S3-optimized commit protocol is an alternative FileCommitProtocol implementation that is optimized for writing files with Spark dynamic partition overwrite to Amazon S3 when using EMRFS. The protocol improves application performance by avoiding rename operations in Amazon S3 during the Spark dynamic partition overwrite job commit phase.
FileCommitProtocol defines how a single Spark job commits its output. Call sequence order is basically:
Driver calls setupJob
As part of each task's execution, the executor calls setupTask and then commitTask (or abortTask if a task fails).
When all tasks are completed successfully, the driver calls commitJob (or abortJob if too many task fails).
Spark job uses Spark SQL, DataFrames, or Dataset to write files.
Spark jobs with partitionOverwriteMode set as dynamic .
Multipart uploads are enabled for EMR. -- default
The filesystem cache for EMRFS is enabled i.e fs.s3.impl.disabled.cache is set to false. --default
Spark jobs write to the default partition location.
Spark must have configs:
spark.sql.sources.commitProtocolClass must be set to org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol .
The partitionOverwriteMode write option or spark.sql.sources.partitionOverwriteMode must be set to dynamic.
While writing in an S3-location mode must be set to overwrite or, while inserting data into the table, overwrite must be set to True
If Spark jobs overwrite the Hive metastore Parquet table, spark.sql.hive.convertMetastoreParquet, spark.sql.hive.convertInsertingPartitionedTable, and spark.sql.hive.convertMetastore.partitionOverwriteMode must be set to true.
How does Dynamic Partition Overwrite work when enabled?
Based on Spark 3.x documentation that can be seen here :
dynamicPartitionOverwrite flag is true when in spark configuration partitionOverwriteMode is set as dynamic and false when it's set as static i.e. default configuration.
To see how SQLEmrOptimizedCommitProtocol improves the performance, let's see what happens under the hood when SQLHadoopMapReduceCommitProtocol is used with partitionOverwriteMode as dynamic and write mode as overwrite.
Using SQLHadoopMapReduceCommitProtocol with partitionOverwriteMode set to dynamic on a partitioned Hive table
Alright as we can see other configurations are present by default so nothing specific to be set for this. Let's create a partitioned table on which we will be writing our data.
Alright, let's look into the logs for this to see exactly how it's writing data.
Here's a quick overview of what is exactly happening in the different phases of the write process.
Now if we look into logs, will observe the following things:
.spark-staging-fda31ca9-c6cd-4a5b-975f-53404934c6fa directory is created within the output/destination location i.e. s3://aws-blog-post-bucket/blogs_db/emr_committer_test
Inside this .spark-staging-${jobId} directory another staging directory is created for writing the data i.e. .emrfs_staging_0_attempt_202402051930535732531040944174447_0001_m_000000_1 which is an attempt directory.
Data is first written into an EMR Local location /mnt/s3/emrfs-6613074794378474867/ and then from there, it's uploaded to .spark-staging-fda31ca9-c6cd-4a5b-975f-53404934c6fa directory.
A success file _SUCCESS is written into .spark-staging-${jobId} directory, which mentions that the task write is completed successfully.
Finally, a rename happens for .spark-staging-${jobId} directory to the final destination directory i.e. s3://aws-blog-post-bucket/blogs_db/emr_committer_test/dt=2024-02-04 using algorithm version 1 which means a sequential rename.
This rename can be super expensive based on the number of files that need to be renamed and the file size as we mentioned earlier in the issues with the rename in Object Storages.
OutputCommitter used is FileSystemOptimizedCommitter .
As it involves renaming of file and data is not directly written into destination directly, it can be seen that the writes are expensive hence your spark job will take more time to perform writes.
Let's look into what happens when we use SQLEmrOptimizedCommitProtocol for partitionOverwriteMode as dynamic
Using SQLEmrOptimizedCommitProtocolwith dynamicOverwrite set to dynamic on a partitioned Hive table
Let's take a look at the logs to see the differences compared to the previous case
24/02/05 19:23:00 INFO FileSystemOptimizedCommitter: Nothing to setup as successful task attempt outputs are written directly
24/02/05 19:23:07 INFO SQLConfCommitterProvider: Getting user defined output committer class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
24/02/05 19:23:07 INFO EmrOptimizedParquetOutputCommitter: EMR Optimized Committer: ENABLED
24/02/05 19:23:07 INFO EmrOptimizedParquetOutputCommitter: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter
24/02/05 19:23:07 INFO FileOutputCommitter: File Output Committer Algorithm version is 2
24/02/05 19:23:07 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: true
24/02/05 19:23:08 INFO StagingUploadPlanner: Creating a new staged file 's3://aws-blog-post-bucket/blogs_db/emr_committer_test/dt=2024-02-05/.emrfs_staging_9f31fb50-1a77-47dd-8cf5-779e36a61b12-0-1/part-00000-9f31fb50-1a77-47dd-8cf5-779e36a61b12.c000.snappy.parquet' with destination key 'blogs_db/emr_committer_test/dt=2024-02-05/part-00000-9f31fb50-1a77-47dd-8cf5-779e36a61b12.c000.snappy.parquet'
24/02/05 19:23:08 INFO CodecPool: Got brand-new compressor [.snappy]
24/02/05 19:23:09 INFO MultipartUploadOutputStream: close closed:false s3://aws-blog-post-bucket/blogs_db/emr_committer_test/dt=2024-02-05/part-00000-9f31fb50-1a77-47dd-8cf5-779e36a61b12.c000.snappy.parquet
24/02/05 19:23:10 INFO MultipartUploadOutputStream: uploadPart: partNum 1 of 's3://aws-blog-post-bucket/blogs_db/emr_committer_test/dt=2024-02-05/part-00000-9f31fb50-1a77-47dd-8cf5-779e36a61b12.c000.snappy.parquet' from local file '/mnt/s3/emrfs-4722872899796052640/0000000000', 2458 bytes in 36 ms, md5: oDnjr2ghLg12wB9dyZPJMg== md5hex: a039e3af68212e0d76c01f5dc993c932
24/02/05 19:23:10 INFO SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_20240205192300177985718977635494_0001_m_000000_1
24/02/05 19:23:10 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 4072 bytes result sent to driver
24/02/05 19:23:10 INFO FileFormatWriter: Start to commit write Job 7fe83e0a-bc20-44ac-a6c2-bae513934abc.
24/02/05 19:23:10 INFO MultipartUploadOutputStream: close closed:false s3://aws-blog-post-bucket/blogs_db/emr_committer_test/_SUCCESS
24/02/05 19:23:10 INFO FileFormatWriter: Write Job 7fe83e0a-bc20-44ac-a6c2-bae513934abc committed. Elapsed time: 469 ms.
24/02/05 19:23:10 INFO FileFormatWriter: Finished processing stats for write job 7fe83e0a-bc20-44ac-a6c2-bae513934abc.
The main differences are:
There is no creation of any .spark-staging-${jobId} path within the target table location, only an .emrfs_staging_${jobId} is created.
There is NO renaming of files happening as the data is being directly multipart uploaded into the target table location.
No Task Ouput Commit happens as the data is directly written into target table location.
A _SUCCESS file is written directly into the table location while the job is being committed.
OutputCommiter used is FileSystemOptimizedCommitter
Now if you are wondering
Why there is such a difference in both the cases when the OutputCommitter used in both the cases is same i.e. FileSystemOptimizedCommitter ?
The reason for this is the committer protocol class that is being used in both the cases. Both SQLEmrOptimizedCommitProtocol and SQLHadoopMapReduceCommitProtocol (that extends HadoopMapReduceCommitProtocol ) are an alternative implementation of FileCommitProtocol), but in both the cases how different stages (like setupTask, commitTask, commitJob etc.) are handled is quite different and hence the difference in both the cases.
One of the things that we can't see in the INFO logs, is deletion of the already existing data files when overwrite mode is being provided as true.
How listing operations are avoided?
Listing operations are mainly used while checking whether the output location is already exists and in case of overwrite = True, it's used to identify if there are any files already existing in the location so as to issue a DELETE for removing those files first before committing the Task outputs.
To avoid any listing operations from the destination location i.e. table S3 location, the multiple HTTPS requests are sent to S3 URI. All these requests being made can be seen in the DEBUG logs for the job. There are mainly 4 requests that are made:
HEAD requests to check if a S3 location is existing or not.
GET requests to check if there are any files (i.e. keys in AWS S3 terms) present in an S3 output location.
DELETE requests for deleting the staging keys/folders (like .spark_staging_${jobID} ), files/keys present in the destination location in case of overwrite mode is given (i.e. in case of S3 writes mode("overwrite") , in case of table writes .insertInto("table-name", overwrite=True).
PUT requests are made to create _SUCCESS file in case the taskCommit or jobCommit is successful.
Let's look at a few of these requests made from the debug logs, to run your spark jobs in DEBUG mode
We looked into how EMRFS S3-Optimized Committer and Commit Protocol improves the write performance by avoiding the rename operation during task and job commits. Also, to develop a deeper understanding of these, we looked into logs to analyze what happens under the hood when the writes are being optimized and when not.
Here's a table for quick reference:
We also looked into how listing operations are done using HTTPS calls to the S3 URI in the backend for various scenarios.
I hope you know now how to look into your job logs and identify whether EMRFS S3-Optimized committer and committer protocols are working for your Spark Job. I would highly suggest looking into the documentation once for the cases where both of these doesn't work and some considerations while using multipart uploads.
As for the performance improvement over FileOutputCommitter V2, it improves the write performance by 1.6x
You can check the entire blog post here that details the performance improvement for EMRFS S3-Optimized Committer.
That's it for this one folks.!! 😊 See you in next one. 🚀
If it has added any value to you and want to read more content like this, subscribe to the newsletter, it's free of cost and I will make sure every post is worth your time and makes you a better Data Engineer.
Sign up for Akashdeep Gupta
Big Data and Serverless Tech implementation and tutorials on cloud
No spam. Unsubscribe anytime.
You might also like...
Optimizing Iceberg MERGE Statements
How can you eliminate shuffling, sorting, and push-down filters to optimize the Apache Iceberg merge statements?
11 min read
Selecting between Double and Decimal Data Type To Avoid Unexpected Results
How to choose between Double and Decimal data types for your tables/datasets, why does it matter, and when to choose which one?
4 min read
How withColumn Can Degrade the Performance of a Spark Job?
Reasons and Solutions to Avoid Performance Degradation due to excessive use of `.withColumn()` in Apache Spark
8 min read
Shuffle-less Join, a.k.a Storage Partition Join in Apache Spark - Why, How and Where?
A Deep Dive into Shuffle-less joins (Storage Partitioned Joins) in Apache Spark to improve Join performance when using V2 Data Sources.
10 min read
Enhancing Spark Job Performance with Multithreading
It covers a Spark Job Optimization technique to enhance the performance of independent running queries using Multithreading in Pyspark.
Member discussion