Skip to content
Large-Scale Data Engineering in Cloud

Performance Tuning, Cost Optimization / Internals, Research by Dmitry Tolpeko

  • About
  • About
  • Spark

    Spark – LIMIT on Large Datasets – CollectLimit, GlobalLimit, LocalLimit, spark.sql.limit.scaleUpFactor

    September 17, 2023

    You use the LIMIT clause to quickly browse and review data samples, so you expect that such queries complete in less than a second. But let’s consider Spark’s LIMIT behaviour on very large data sets and what performance issues you may have.

    Read More
    dmtolpeko
  • I/O,  Parquet,  Spark

    Spark – Number of Tasks Reading Large Number of Small Parquet Files

    July 19, 2023

    Sometimes source data arrives from a streaming application as a large set of small Parquet files that you need to compact for more effective read by analytic applications.

    You can observe that by default the number of tasks to read such Parquet files is larger than expected. Let’s see why.

    Read More
    dmtolpeko
  • I/O,  Spark,  Storage

    Spark 2.4 – Slow Performance on Writing into Partitions – Why Sorting Involved

    August 30, 2022

    It is quite typical to write the resulting data into a partitioned table, but performance can be very slow compared with writing the same data volume into a non-partitioned table.

    Let’s investigate why it is slow, and why the sorting operation happens.

    Read More
    dmtolpeko
  • I/O,  Spark,  Storage

    Spark – Create Multiple Output Files per Task using spark.sql.files.maxRecordsPerFile

    August 30, 2022

    It is highly recommended that you try to evenly distribute the work among multiple tasks so every task produces a single output file and job is completed in parallel.

    But sometimes it still may be useful when a task generates multiple output files with the limited number of records in each file by using spark.sql.files.maxRecordsPerFile option:

    Read More
    dmtolpeko
  • Amazon,  AWS,  EMR,  Spark

    EMR Spark – Initial Number of Executors and spark.dynamicAllocation.enabled

    August 29, 2022

    By default, Spark EMR clusters have spark.dynamicAllocation.enabled set to true meaning that the cluster will dynamically allocate resources to scale the executors up and down whenever required.

    But what is the initial number of executors when you start your Spark job?

    Read More
    dmtolpeko
  • Amazon,  AWS,  EMR,  Spark

    EMR Spark – Much Larger Executors are Created than Requested

    August 26, 2022

    Starting from EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example, EMR created my cluster with the following default settings (it depends on the instance type and maximizeResourceAllocation classification option):

      spark.executor.memory                      18971M
      spark.executor.cores                       4
      spark.yarn.executor.memoryOverheadFactor   0.1875
    

    But when I start a Spark session (pyspark command) I see the following:

    Read More
    dmtolpeko
  • Amazon,  AWS,  EMR,  S3,  Spark

    Amazon EMR Spark – Ignoring Partition Filter and Listing All Partitions When Reading from S3A

    April 20, 2022

    I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:

    $ hive -e "show partitions events";
    ...
    dateint=20220419/hour=11
    dateint=20220419/hour=12
    dateint=20220419/hour=13
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=11
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=12
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=13
    

    S3A:// is specified for every partition in this table.

    Reading a Partition in Amazon EMR Spark

    When I made an attempt to read data from a single partition using Spark SQL:

    $ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"
    

    The Spark driver failed with:

    # java.lang.OutOfMemoryError: GC overhead limit exceeded
    # -XX:OnOutOfMemoryError="kill -9 %p"
    #   Executing /bin/sh -c "kill -9 4847"...
    
    Read More
    dmtolpeko
  • I/O,  Parquet,  Spark

    Spark – Reading Parquet – Why the Number of Tasks can be Much Larger than the Number of Row Groups

    March 19, 2021

    A row group is a unit of work for reading from Parquet that cannot be split into smaller parts, and you expect that the number of tasks created by Spark is no more than the total number of row groups in your Parquet data source.

    But Spark still can create much more tasks than the number of row groups. Let’s see how this is possible.

    Read More
    dmtolpeko
  • I/O,  Parquet,  Spark

    Spark – Reading Parquet – Predicate Pushdown for LIKE Operator – EqualTo, StartsWith and Contains Pushed Filters

    March 7, 2021

    A Parquet file contains MIN/MAX statistics for every column for every row group that allows Spark applications to skip reading unnecessary data chunks depending on the query predicate. Let’s see how this works with LIKE pattern matching filter.

    For my tests I will use a Parquet file with 4 row groups and the following MIN/MAX statistics for product column:

    Read More
    dmtolpeko
  • Amazon,  AWS,  Hive,  I/O,  Parquet,  S3,  Spark

    Spark – Slow Load Into Partitioned Hive Table on S3 – Direct Writes, Output Committer Algorithms

    December 30, 2019

    I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.

    Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.

    Let’s see what is the reason of such behavior and how we can improve the performance.

    Read More
    dmtolpeko
 Older Posts

Recent Posts

  • Sep 17, 2023 Spark – LIMIT on Large Datasets – CollectLimit, GlobalLimit, LocalLimit, spark.sql.limit.scaleUpFactor
  • Jul 19, 2023 Spark – Number of Tasks Reading Large Number of Small Parquet Files
  • Aug 30, 2022 Spark 2.4 – Slow Performance on Writing into Partitions – Why Sorting Involved
  • Aug 30, 2022 Spark – Create Multiple Output Files per Task using spark.sql.files.maxRecordsPerFile
  • Aug 29, 2022 EMR Spark – Initial Number of Executors and spark.dynamicAllocation.enabled

Archives

  • September 2023 (1)
  • July 2023 (1)
  • August 2022 (4)
  • April 2022 (1)
  • March 2021 (2)
  • January 2021 (2)
  • June 2020 (4)
  • May 2020 (8)
  • April 2020 (3)
  • February 2020 (3)
  • December 2019 (5)
  • November 2019 (4)
  • October 2019 (1)
  • September 2019 (2)
  • August 2019 (1)
  • May 2019 (9)
  • April 2019 (2)
  • January 2019 (3)
  • December 2018 (4)
  • November 2018 (1)
  • October 2018 (6)
  • September 2018 (2)

Categories

  • Amazon (14)
  • Auto Scaling (1)
  • AWS (28)
  • Cost Optimization (1)
  • CPU (2)
  • Data Skew (1)
  • Distributed (1)
  • EC2 (1)
  • EMR (13)
  • ETL (2)
  • Flink (5)
  • Hadoop (14)
  • Hive (17)
  • Hue (1)
  • I/O (23)
  • JVM (3)
  • Kinesis (1)
  • Logs (1)
  • Memory (7)
  • Monitoring (4)
  • ORC (5)
  • Parquet (8)
  • Pig (2)
  • Presto (3)
  • Qubole (2)
  • RDS (1)
  • S3 (18)
  • Snowflake (6)
  • Spark (11)
  • Storage (14)
  • Tez (10)
  • YARN (18)

Meta

  • Log in
  • Entries feed
  • Comments feed
  • WordPress.org
Savona Theme by Optima Themes