I/O,  Parquet,  Spark

Spark – Reading Parquet – Why the Number of Tasks can be Much Larger than the Number of Row Groups

A row group is a unit of work for reading from Parquet that cannot be split into smaller parts, and you expect that the number of tasks created by Spark is no more than the total number of row groups in your Parquet data source.

But Spark still can create much more tasks than the number of row groups. Let’s see how this is possible.

Task Planning

Ideally to determine the number of tasks you have to read the footer of every Parquet file in your data source and define the total number of row groups. But this operation can be very expensive especially for a cloud storage. Consider, for example, opening and reading footers for 30,000 – 50,000 Parquet files from S3 before launching the job. It can take dozens of minutes before Spark can even start the job.

Instead, to speed up the job launch process Spark estimates the number of tasks without reading the Parquet file footers.

Firstly, Spark lists the files in the data source and defines their total size. File listing is not super fast in cloud storages, but at least much faster than opening and reading every file:

  totalBytes = sum(fileSize + conf("spark.sql.files.openCostInBytes"))

Note that Spark adds the value defined by the spark.sql.files.openCostInBytes (4 MB by default) configuration parameter to the size of every file.

Then Spark defines how many bytes can be processed concurrently taking into account the default parallelism:

  bytesPerCore = totalBytes / conf("spark.default.parallelism")

And finally, Spark calculates the data size for each task as minimum of spark.sql.files.maxPartitionBytes and bytesPerCore:

  maxSplitBytes = min(conf("spark.sql.files.maxPartitionBytes"), bytesPerCore)

The Spark driver logs the calculated values, you can check the following message:

  logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
    s"open cost is considered as scanning $openCostInBytes bytes.")

So after Spark gets maxSplitBytes it can iterate over all files and assign the tasks.

For example, I have 3 Parquet files (with 4 rows groups only):

  file1.parquet   368,565,268  (with 2 row groups)
  file2.parquet    46,584,043  (with 1 row group)
  file3.parquet   107,315,179  (with 1 row group)

Then

  totalBytes = 368,565,268 + 46,584,043 + 107,315,179 + 3 * 4,194,304 = 535,047,402
 
  -- 2 executor instances with 4 cores means default parallelism is 8 in my test
  bytesPerCore = totalBytes / 8 = 66,880,925

  maxSplitBytes = min(134,217,728; 66,880,925) = 66,880,925  

That I can observe and confirm from the Spark driver log:

21/03/07 17:03:58 main INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 66880925 bytes, open cost is considered as scanning 4194304 bytes.

And 9 tasks will be assigned:

21/03/07 17:03:58 dag-scheduler-event-loop INFO DAGScheduler: Got job 0 
(performDistributedWrite at SqlWrapper.scala:70) with 9 output partitions

21/03/07 17:03:58 dag-scheduler-event-loop INFO YarnScheduler: 
Adding task set 0.0 with 9 tasks

Why 9 tasks? Spark iterates over files and checks how many maxSplitBytes can be allocated per file:

  file1.parquet   368,565,268 - 6 tasks
  file2.parquet    46,584,043 - 1 task
  file3.parquet   107,315,179 - 2 tasks

You can see that the number of tasks was defined by Spark without reading Parquet footers and without knowing the actual number of row groups in the data source.

Default Parallelism

You can notice that the number of tasks depends not only on the total size of the source data but the default parallelism as well.

In YARN deployment, if not set explicitly (--conf spark.default.parallelism), the default parallelism is defined by the total number of executors and their cores:

  default_parallelism = conf("spark.executor.instances") * conf("spark.executor.cores")

For example, when I explicitly set --conf spark.default.parallelism=30, I get 30 tasks for the same 3 source Parquet files:

21/03/08 15:31:52 main INFO FileSourceScanExec: Planning scan with bin packing, 
max size: 17834913 bytes, open cost is considered as scanning 4194304 bytes.

21/03/08 15:31:52 dag-scheduler-event-loop INFO YarnScheduler: 
Adding task set 0.0 with 30 tasks

Note that maxSplitBytes become 17,834,913 bytes now.

Row Group Assignment and Empty Tasks

Ok, no matter how may tasks are created, the row group is still a unit of work for Parquet and multiple tasks cannot process the same row group.

Moreover each task needs to know the actual Parquet row group boundaries (offset and length) in the file, not some abstract boundaries calculated as described above.

As I noted before there are 9 tasks launched to read 3 Parquet files in my test:

21/03/19 05:23:49 INFO YarnScheduler: Adding task set 0.0 with 9 tasks
21/03/19 05:23:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (executor 1, partition 0, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (executor 2, partition 1, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (executor 1, partition 2, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (executor 2, partition 3, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (executor 1, partition 4, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (executor 2, partition 5, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (executor 1, partition 6, ...)
21/03/19 05:23:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (executor 2, partition 7, ...)
21/03/19 05:23:55 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (executor 1, partition 8, ...)

And looking at the Spark Executor logs you can see file range assignments:

Executor 1:

21/03/19 05:23:53 Executor task launch worker for task 2 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 133761850-200642775
21/03/19 05:23:53 Executor task launch worker for task 4 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 267523700-334404625
21/03/19 05:23:53 Executor task launch worker for task 6 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file2.parquet, range: 0-46584043
21/03/19 05:23:53 Executor task launch worker for task 0 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 0-66880925
21/03/19 05:23:55 Executor task launch worker for task 8 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 334404625-368565268

Executor 2:

21/03/19 05:23:53 Executor task launch worker for task 1 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 66880925-133761850
21/03/19 05:23:53 Executor task launch worker for task 7 INFO FileScanRDD:
  Reading File path: s3://cloudsqale/data/file3.parquet, range: 66880925-107315179
21/03/19 05:23:53 Executor task launch worker for task 3 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file1.parquet, range: 200642775-267523700
21/03/19 05:23:53 Executor task launch worker for task 5 INFO FileScanRDD: 
  Reading File path: s3://cloudsqale/data/file3.parquet, range: 0-66880925

But looking at the Spark UI you can see that only 4 tasks actually processed data (there are only 4 rows groups in my 3 test Parquet files), while others read 0 records:

The reason is that every task reads the footer for the assigned Parquet file, gets the list of row groups and checks if the task range owns the midpoint of the row group:

In the example above, 5 tasks were assigned to read a Parquet file with 2 row groups, but only Task 2 and Task 4 actually read the data as their ranges own the midpoints of row groups.

For more details, you can see code of ParquetFileFormat, ParquetInputSplit and filterFileMetaDataByMidpoint method of org.apache.parquet.format.converter.ParquetMetadataConverter

So although the number of tasks can be greater than the number of row groups, these “redundant” tasks do not actually read data, but they all read the corresponding Parquet file footers still increasing the number of IO operations. So ideally the number of tasks should be no more than the number of row groups.