I/O,  Parquet,  Spark

Spark – Number of Tasks Reading Large Number of Small Parquet Files

Sometimes source data arrives from a streaming application as a large set of small Parquet files that you need to compact for more effective read by analytic applications.

You can observe that by default the number of tasks to read such Parquet files is larger than expected. Let’s see why.

In my case I got 1,440 files with size ~1.6 MB each and total size ~2,3 GB:

  2023-07-19 00:35:01    1,639,785 events1.parquet
  2023-07-19 00:40:06    1,623,445 events2.parquet
  2023-07-19 00:45:05    1,645,374 events3.parquet
  2023-07-19 00:50:06    1,643,287 events4.parquet
  2023-07-19 00:55:06    1,646,472 events5.parquet
  2023-07-19 01:00:01    1,672,244 events6.parquet
  ...
  Total Objects: 1,440
  Total Size:    2,389,976,023

To define the number of tasks Spark firstly calculates maxSplitBytes i.e. how many bytes to send to each task as follows (see maxSplitBytes method in FilePartition.scala for more details):

  foreach(file) {
     totalBytes += file.length + "spark.sql.files.openCostInBytes" 
  }

  bytesPerCore = totalBytes / ("spark.sql.files.minPartitionNum" or "spark.default.parallelism")

  maxSplitBytes = min("spark.sql.files.maxPartitionBytes", 
                      max("spark.sql.files.openCostInBytes", bytesPerCore))

spark.sql.files.openCostInBytes is 4 MB by default and when it is added to every file (see totalBytes calculation above) it means that when the number of files is large then maxSplitBytes is usually equal to spark.sql.files.maxPartitionBytes with default value of 128 MB.

Actually in the driver log I can see max size per task:

  INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, 
  open cost is considered as scanning 4194304 bytes, number of split files: 1440

So you can expect that each task will process max size: 134217728 bytes and the number of tasks would be 2,3 GB / 128 MB = 18. But when I launch a job to read these files I get 65 tasks (EMR Spark 3.3, Databricks Spark 3.3):

  INFO DAGScheduler: Got map stage job 0 (processLine at CliDriver.java:336) with 65 output partitions

So every task processes 2,3 GB / 65 = ~37 MB of input data, not 128 MB as calculated above. The reason is that when Spark adds a file a partition it defines the size as the file size plus spark.sql.files.openCostInBytes (see getFilePartitions method in FilePartition.scala for more details):

  foreach(file) {
    if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }
    // Add the given file to the current partition.
    currentSize += file.length + openCostInBytes
    currentFiles += file
 }

So spark.sql.files.openCostInBytes becomes notable when you process large number of small files. As the result every task actually processes less data then defined by maxSplitBytes and more tasks are created.