I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.
Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.
Let’s see what is the reason of such behavior and how we can improve the performance.
Loading Into Static Partition – Initial Version
Initially, the PySpark script to load data was as follows:
spark = SparkSession.builder.enableHiveSupport().getOrCreate() hiveContext = HiveContext(spark) hiveContext.setConf("hive.exec.compress.output", "true") hiveContext.setConf("parquet.compression", "SNAPPY") spark.sql(""" INSERT OVERWRITE TABLE events PARTITION (event_dt = '2019-12-26') SELECT user_id, session_id, event_timestamp, city, country, ... FROM events_raw """)
From the Spark driver log you can see the Spark job progress:
19/12/26 09:05:16 INFO SparkContext: Running Spark version 2.1.1 ... 19/12/26 09:05:35 INFO DirectFileOutputCommitter: Nothing to setup since the outputs are written directly. 19/12/26 09:05:35 INFO FileInputFormat: Total input paths to process : 2126 ... 19/12/26 09:05:35 INFO YarnScheduler: Adding task set 0.0 with 2126 tasks 19/12/26 09:05:40 INFO TaskSetManager: Starting task 0.0 in stage 0.0 ... 19/12/26 09:15:54 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (executor 2) (201/2126) 19/12/26 09:37:22 INFO TaskSetManager: Finished task 2125.0 in stage 0.0 (executor 339) (1992/2126) 19/12/26 09:43:53 INFO TaskSetManager: Finished task 2059.0 in stage 0.0 (executor 322) (2126/2126) ... 19/12/26 09:43:53 INFO DAGScheduler: Job 0 finished: sql at NativeMethodAccessorImpl.java:0, took 2297.230480 s
You can see that it took just 38 minutes to complete all 2,126 tasks by Spark. After that the Spark driver started renaming the files:
19/12/26 09:43:53 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 19/12/26 09:43:54 INFO S3NativeFileSystem: rename s3://cloudsqale/events/.hive-staging_hive_2019-12-26_5896227393700076788-1/-ext-10000/part-00000 s3://cloudsqale/events/event_dt=2019-12-26/part-00000 19/12/26 09:43:59 INFO S3NativeFileSystem: rename s3://cloudsqale/events/.hive-staging_hive_2019-12-26_5896227393700076788-1/-ext-10000/part-00001 s3://cloudsqale/events/event_dt=2019-12-26/part-00001 ... 19/12/26 12:52:00 INFO S3NativeFileSystem: rename s3://cloudsqale/events/.hive-staging_hive_2019-12-26_5896227393700076788-1/-ext-10000/part-02125 s3://cloudsqale/events/event_dt=2019-12-26/part-02125
Output files generated by the Spark tasks are moved from the staging directory into the final destination.
It is a sequential process performed by the Spark driver that renames files one by one. A file rename is quite long operation in S3 since it requires to move and delete the file so this time is proportional to the file size.
In my case the rename of 2,126 files (~ 2 TB) took 3 hours 9 minutes (5.3 seconds per file, or 182.4 MB/sec on average).
Let’s consider 2 phases of job execution – Task and Job commit that can help us understand how they affect the performance of a Spark job.
Task Commit
Task commit is a process when a task makes its result visible to the job driver. Above you can notice from the log:
19/12/26 09:05:35 INFO DirectFileOutputCommitter: Nothing to setup since the outputs are written directly.
When loading into a Hive table from Spark this option is taken from /etc/hadoop/conf/mapred-site.xml
configuration file:
<property> <name>mapred.output.committer.class</name> <value>org.apache.hadoop.mapred.DirectFileOutputCommitter</value> </property>
and means that all tasks write their outputs directly to the task output directory. Looking at an executor log you can see
19/12/26 09:05:40 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 19/12/26 09:05:43 INFO S3NativeFileSystem: Opening 's3://cloudsqale/events_raw/file-00077.gz' for reading 19/12/26 09:05:44 INFO ParquetRecordWriterWrapper: creating real writer to write at s3://cloudsqale/events/.hive-staging_hive_2019-12-26_5896227393700076788-1/-ext-10000/part-00000 ... 19/12/26 09:15:54 INFO SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_20191226090533_0000_m_000000_0 19/12/26 09:15:54 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0).
The directory .hive-staging_hive_.../-ext-10000
is the task output directory, but it not the job final directory yet.
Job Commit
After all tasks competed, a process called Job commit is started. This process makes the job results visible.
From the log above you can see:
19/12/26 09:43:53 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
Algorithm version 1 assumes that the tasks directories are renamed (moved as the directory is also changed), but rename operation in S3 is slow, that’s why it took more than 3 hour to commit the job.
File Output Committer Algorithm version 2
The version 2 algorithm directly commits task output into the destination directory so the job commit does not require to move the files.
Unfortunately this does not work when loading into Hive partitions (Spark driver still moves the files):
19/12/26 15:19:09 INFO FileOutputCommitter: File Output Committer Algorithm version is 2 19/12/26 15:19:10 INFO S3NativeFileSystem: rename s3://cloudsqale/events/.hive-staging_hive_2019-12-26_3715398370541372470-1/-ext-10000/part-00000 s3://cloudsqale/events/event_dt=2019-12-26/part-00000 19/12/26 15:19:13 INFO S3NativeFileSystem: rename s3://cloudsqale/events/.hive-staging_hive_2019-12-26_3715398370541372470-1/-ext-10000/part-00001 s3://cloudsqale/events/event_dt=2019-12-26/part-00001 ...
Some cloud vendors offer specific options for direct load into partitions but let’s consider a generic approach that should work in any Spark set up.
Parquet Output
Instead of loading data into a Hive partition, we will save the results in Parquet format at the specified location, and then just add a partition on top of this location:
spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sparkContext._jsc.hadoopConfiguration() \ .set("mapreduce.fileoutputcommitter.algorithm.version", "2") location = "s3://cloudsqale/events/event_dt=2019-12-26" df = spark.sql(""" SELECT user_id, session_id, event_timestamp, city, country, ... FROM events_raw """) df.write.option("compression", "snappy").mode("overwrite").parquet(location) spark.sql("ALTER TABLE events ADD PARTITION (event_dt = '2019-12-26')")
Now the job is very fast:
19/12/30 10:10:13 INFO SparkContext: Running Spark version 2.1.1 ... 19/12/30 10:10:29 INFO ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter ... 19/12/30 10:10:29 INFO FileOutputCommitter: File Output Committer Algorithm version is 2 19/12/30 10:10:29 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter ... 19/12/30 10:44:36 INFO DAGScheduler: Job 0 finished: parquet at NativeMethodAccessorImpl.java:0, took 2044.186653 s 19/12/30 10:44:36 INFO SparkSqlParser: Parsing command: ALTER TABLE events ADD PARTITION (event_dt = '2019-12-26')
There are no rename commands anymore at the job commit level.
ParquetOutputCommitter
Now if you look at the task log you can notice the following:
19/12/30 10:10:53 INFO FileOutputCommitter: File Output Committer Algorithm version is 2 ... 19/12/30 10:10:54 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter ... 19/12/30 10:13:46 INFO S3NativeFileSystem: rename s3://cloudsqale/events/event_dt=2019-12-26/_temporary/0/_temporary/attempt_20191230101053_0000_m_000038_0/part-00038.snappy.parquet s3://cloudsqale/events/event_dt=2019-12-26/part-00038.snappy.parquet 19/12/30 10:13:49 INFO FileOutputCommitter: Saved output of task 'attempt_20191230101053_0000_m_000038_0' to s3://cloudsqale/events/event_dt=2019-12-26 19/12/30 10:13:49 INFO SparkHadoopMapRedUtil: attempt_20191230101053_0000_m_000038_0: Committed
Tasks now write to a temporary directory before saving the result into the final destination. This is a distributed process now as all tasks can do this operation concurrently.
DirectParquetOutputCommitter
Although DirectParquetOutputCommitter
is deprecated since Spark 2.0 some cloud vendors still provide various options to enable task direct writes.
Note when using direct writes the speculative execution has to be switched off (this is the default in Spark).