You use the LIMIT clause to quickly browse and review data samples, so you expect that such queries complete in less than a second. But let’s consider Spark’s LIMIT behaviour on very large data sets and what performance issues you may have.
-
-
Spark – Number of Tasks Reading Large Number of Small Parquet Files
Sometimes source data arrives from a streaming application as a large set of small Parquet files that you need to compact for more effective read by analytic applications.
You can observe that by default the number of tasks to read such Parquet files is larger than expected. Let’s see why.
-
Spark 2.4 – Slow Performance on Writing into Partitions – Why Sorting Involved
It is quite typical to write the resulting data into a partitioned table, but performance can be very slow compared with writing the same data volume into a non-partitioned table.
Let’s investigate why it is slow, and why the sorting operation happens.
-
Spark – Create Multiple Output Files per Task using spark.sql.files.maxRecordsPerFile
It is highly recommended that you try to evenly distribute the work among multiple tasks so every task produces a single output file and job is completed in parallel.
But sometimes it still may be useful when a task generates multiple output files with the limited number of records in each file by using
spark.sql.files.maxRecordsPerFile
option: -
EMR Spark – Initial Number of Executors and spark.dynamicAllocation.enabled
By default, Spark EMR clusters have
spark.dynamicAllocation.enabled
set totrue
meaning that the cluster will dynamically allocate resources to scale the executors up and down whenever required.But what is the initial number of executors when you start your Spark job?
-
EMR Spark – Much Larger Executors are Created than Requested
Starting from EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example, EMR created my cluster with the following default settings (it depends on the instance type and
maximizeResourceAllocation
classification option):spark.executor.memory 18971M spark.executor.cores 4 spark.yarn.executor.memoryOverheadFactor 0.1875
But when I start a Spark session (
pyspark
command) I see the following: -
Amazon EMR Spark – Ignoring Partition Filter and Listing All Partitions When Reading from S3A
I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:
$ hive -e "show partitions events"; ... dateint=20220419/hour=11 dateint=20220419/hour=12 dateint=20220419/hour=13 $ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=11 $ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=12 $ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location Location: s3a://cloudsqale/events/dateint=20220419/hour=13
S3A://
is specified for every partition in this table.Reading a Partition in Amazon EMR Spark
When I made an attempt to read data from a single partition using Spark SQL:
$ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"
The Spark driver failed with:
# java.lang.OutOfMemoryError: GC overhead limit exceeded # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 4847"...
-
Spark – Reading Parquet – Why the Number of Tasks can be Much Larger than the Number of Row Groups
A row group is a unit of work for reading from Parquet that cannot be split into smaller parts, and you expect that the number of tasks created by Spark is no more than the total number of row groups in your Parquet data source.
But Spark still can create much more tasks than the number of row groups. Let’s see how this is possible.
-
Spark – Reading Parquet – Predicate Pushdown for LIKE Operator – EqualTo, StartsWith and Contains Pushed Filters
A Parquet file contains MIN/MAX statistics for every column for every row group that allows Spark applications to skip reading unnecessary data chunks depending on the query predicate. Let’s see how this works with LIKE pattern matching filter.
For my tests I will use a Parquet file with 4 row groups and the following MIN/MAX statistics for
product
column: -
Spark – Slow Load Into Partitioned Hive Table on S3 – Direct Writes, Output Committer Algorithms
I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.
Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.
Let’s see what is the reason of such behavior and how we can improve the performance.