Amazon,  AWS,  EMR,  S3,  Spark

Amazon EMR Spark – Ignoring Partition Filter and Listing All Partitions When Reading from S3A

I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:

$ hive -e "show partitions events";
...
dateint=20220419/hour=11
dateint=20220419/hour=12
dateint=20220419/hour=13

$ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location
Location:   s3a://cloudsqale/events/dateint=20220419/hour=11

$ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location
Location:   s3a://cloudsqale/events/dateint=20220419/hour=12

$ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location
Location:   s3a://cloudsqale/events/dateint=20220419/hour=13

S3A:// is specified for every partition in this table.

Reading a Partition in Amazon EMR Spark

When I made an attempt to read data from a single partition using Spark SQL:

$ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"

The Spark driver failed with:

# java.lang.OutOfMemoryError: GC overhead limit exceeded
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 4847"...

Looking at the driver log I noticed that Spark skipped the partition discovery because of using S3A in my case, and started listing all 10,255 partitions:

INFO PrunedInMemoryFileIndex: Fast s3 partition discovery was skipped 
 (reason=listing is unsupported for given partition paths)
INFO InMemoryFileIndex: Listing leaf files and directories in parallel under 10255 paths. 
  The first several paths are: 
     s3a://cloudsqale/events/dateint=20210216/hour=08,
     s3a://cloudsqale/events/dateint=20210216/hour=09,
     s3a://cloudsqale/events/dateint=20210216/hour=10,...
INFO DAGScheduler: Got job 0 (processCmd at CliDriver.java:376) with 10000 output partitions
INFO DAGScheduler: Submitting 10000 missing tasks from ResultStage 0 
INFO YarnScheduler: Adding task set 0.0 with 10000 tasks

To list files in 10,255 partitions the Spark driver launched 10,000 tasks:

INFO TaskSetManager: Starting task 0.0 in stage 0.0
INFO TaskSetManager: Starting task 1.0 in stage 0.0
INFO TaskSetManager: Starting task 2.0 in stage 0.0
...
INFO TaskSetManager: Starting task 1486.0 in stage 0.0
INFO TaskSetManager: Starting task 1487.0 in stage 0.0
INFO TaskSetManager: Starting task 1488.0 in stage 0.0
#
# java.lang.OutOfMemoryError: GC overhead limit exceeded
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 4847"... 

The Spark job finally failed as the table has more than 8 millions files and there was not enough space for JVM to hold this list in memory.

Adding TBLPROPERTIES

If you are unable to modify the locations for partitions to use S3 instead of S3A you can help Spark to prune partitions by adding some properties to TBLPROPERTIES in Hive, see below.

Initially, my table was defined as follows (I skipped some columns for short):

$ hive -e "show create table events"

CREATE EXTERNAL TABLE events(
  account string,
  event_name string,
  event_timestamp string,
  json_map map,
  app_id string,
  app_version string,
  ...
 )
PARTITIONED BY (
  dateint int,
  hour string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3a://cloudsqale/events'
TBLPROPERTIES (
  'transient_lastDdlTime'='1613462289')

Then I added the following TBLPROPERTIES to the table to help Spark locate the partitions:

alter table events set TBLPROPERTIES (
'spark.sql.create.version'='2.2 or prior',
'spark.sql.sources.schema.numPartCols'='2',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"account","type":"string","nullable":true,"metadata":{}},{"name":"event_name","type":"string","nullable":true,"metadata":{}},{"name":"event_timestamp","type":"string","nullable":true,"metadata":{}},{"name":"json_map","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"app_id","type":"string","nullable":true,"metadata":{}},{"name":"app_version","type":"string","nullable":true,"metadata":{}},...]}',
'spark.sql.sources.schema.partCol.0'='dateint',
'spark.sql.sources.schema.partCol.1'='hour',
'transient_lastDdlTime'='1650449482');

Now if I re-run the same query:

$ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"

Spark does not list all partitions anymore:

INFO PrunedInMemoryFileIndex: Fast s3 partition discovery was skipped
 (reason=listing is unsupported for given partition paths)
INFO PrunedInMemoryFileIndex: It took 3354 ms to list leaf files for 1 paths.
INFO FileSourceStrategy: Pruning directories with: isnotnull(dateint#38),isnotnull(hour#39),
  (dateint#38 = 20220419),(hour#39 = 11)

INFO PrunedInMemoryFileIndex: Selected 1 partitions out of 1, pruned 0.0% partitions.
INFO YarnScheduler: Adding task set 0.0 with 6279 tasks

Now Spark was able to select only 1 partition immediately and started 6,279 tasks to read data from this partition.

Note that if your query does not fail with listing all partitions, the Spark will set the required TBLPROPERTIES after the first successful run, so all subsequent queries will not read all partitions anymore. In this case you do not need to execute alter table ... set TBLPROPERTIES statement, Spark will do it for you.

I had to run it because due to very large number of partitions and files I could not run the job successfully.