You use the LIMIT clause to quickly browse and review data samples, so you expect that such queries complete in less than a second. But let’s consider Spark’s LIMIT behaviour on very large data sets and what performance issues you may have.
Sometimes source data arrives from a streaming application as a large set of small Parquet files that you need to compact for more effective read by analytic applications.
You can observe that by default the number of tasks to read such Parquet files is larger than expected. Let’s see why.
It is quite typical to write the resulting data into a partitioned table, but performance can be very slow compared with writing the same data volume into a non-partitioned table.
Let’s investigate why it is slow, and why the sorting operation happens.
It is highly recommended that you try to evenly distribute the work among multiple tasks so every task produces a single output file and job is completed in parallel.
But sometimes it still may be useful when a task generates multiple output files with the limited number of records in each file by using
By default, Spark EMR clusters have
truemeaning that the cluster will dynamically allocate resources to scale the executors up and down whenever required.
But what is the initial number of executors when you start your Spark job?
Starting from EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example, EMR created my cluster with the following default settings (it depends on the instance type and
spark.executor.memory 18971M spark.executor.cores 4 spark.yarn.executor.memoryOverheadFactor 0.1875
But when I start a Spark session (
pysparkcommand) I see the following:
I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:
$ hive -e "show partitions events"; ... dateint=20220419/hour=11 dateint=20220419/hour=12 dateint=20220419/hour=13 $ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=11 $ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=12 $ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=13
S3A://is specified for every partition in this table.
Reading a Partition in Amazon EMR Spark
When I made an attempt to read data from a single partition using Spark SQL:
$ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"
The Spark driver failed with:
# java.lang.OutOfMemoryError: GC overhead limit exceeded # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 4847"...
A row group is a unit of work for reading from Parquet that cannot be split into smaller parts, and you expect that the number of tasks created by Spark is no more than the total number of row groups in your Parquet data source.
But Spark still can create much more tasks than the number of row groups. Let’s see how this is possible.
A Parquet file contains MIN/MAX statistics for every column for every row group that allows Spark applications to skip reading unnecessary data chunks depending on the query predicate. Let’s see how this works with LIKE pattern matching filter.
For my tests I will use a Parquet file with 4 row groups and the following MIN/MAX statistics for
Every Parquet file has the footer that contains metadata information: schema, row groups and column statistics. The footer is located at the end of the file.
A parquet file content starts and ends with 4-byte
PAR1“magic” string. Right before the ending
PAR1there is 4-byte footer length size (little-endian encoding):
The position of the footer can be easily calculated as:
File_length - Footer_length - 4