Sometimes source data arrives from a streaming application as a large set of small Parquet files that you need to compact for more effective read by analytic applications.
You can observe that by default the number of tasks to read such Parquet files is larger than expected. Let’s see why.
In my case I got 1,440
files with size ~1.6 MB
each and total size ~2,3 GB
:
2023-07-19 00:35:01 1,639,785 events1.parquet 2023-07-19 00:40:06 1,623,445 events2.parquet 2023-07-19 00:45:05 1,645,374 events3.parquet 2023-07-19 00:50:06 1,643,287 events4.parquet 2023-07-19 00:55:06 1,646,472 events5.parquet 2023-07-19 01:00:01 1,672,244 events6.parquet ... Total Objects: 1,440 Total Size: 2,389,976,023
To define the number of tasks Spark firstly calculates maxSplitBytes
i.e. how many bytes to send to each task as follows (see maxSplitBytes
method in FilePartition.scala
for more details):
foreach(file) { totalBytes += file.length + "spark.sql.files.openCostInBytes" } bytesPerCore = totalBytes / ("spark.sql.files.minPartitionNum" or "spark.default.parallelism") maxSplitBytes = min("spark.sql.files.maxPartitionBytes", max("spark.sql.files.openCostInBytes", bytesPerCore))
spark.sql.files.openCostInBytes
is 4 MB
by default and when it is added to every file (see totalBytes
calculation above) it means that when the number of files is large then maxSplitBytes
is usually equal to spark.sql.files.maxPartitionBytes
with default value of 128 MB
.
Actually in the driver log I can see max size
per task:
INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes, number of split files: 1440
So you can expect that each task will process max size: 134217728
bytes and the number of tasks would be 2,3 GB / 128 MB = 18
. But when I launch a job to read these files I get 65
tasks (EMR Spark 3.3, Databricks Spark 3.3):
INFO DAGScheduler: Got map stage job 0 (processLine at CliDriver.java:336) with 65 output partitions
So every task processes 2,3 GB / 65 = ~37 MB
of input data, not 128 MB
as calculated above. The reason is that when Spark adds a file a partition it defines the size as the file size plus spark.sql.files.openCostInBytes
(see getFilePartitions
method in FilePartition.scala
for more details):
foreach(file) { if (currentSize + file.length > maxSplitBytes) { closePartition() } // Add the given file to the current partition. currentSize += file.length + openCostInBytes currentFiles += file }
So spark.sql.files.openCostInBytes
becomes notable when you process large number of small files. As the result every task actually processes less data then defined by maxSplitBytes
and more tasks are created.