• Amazon,  AWS,  EMR,  Spark

    EMR Spark – Much Larger Executors are Created than Requested

    Starting from EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example, EMR created my cluster with the following default settings (it depends on the instance type and maximizeResourceAllocation classification option):

      spark.executor.memory                      18971M
      spark.executor.cores                       4
      spark.yarn.executor.memoryOverheadFactor   0.1875
    

    But when I start a Spark session (pyspark command) I see the following:

  • Amazon,  AWS,  EMR,  S3,  Spark

    Amazon EMR Spark – Ignoring Partition Filter and Listing All Partitions When Reading from S3A

    I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:

    $ hive -e "show partitions events";
    ...
    dateint=20220419/hour=11
    dateint=20220419/hour=12
    dateint=20220419/hour=13
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=11
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=12
    
    $ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=13
    

    S3A:// is specified for every partition in this table.

    Reading a Partition in Amazon EMR Spark

    When I made an attempt to read data from a single partition using Spark SQL:

    $ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"
    

    The Spark driver failed with:

    # java.lang.OutOfMemoryError: GC overhead limit exceeded
    # -XX:OnOutOfMemoryError="kill -9 %p"
    #   Executing /bin/sh -c "kill -9 4847"...
    
  • Amazon,  AWS,  Hive,  I/O,  Parquet,  S3,  Spark

    Spark – Slow Load Into Partitioned Hive Table on S3 – Direct Writes, Output Committer Algorithms

    I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.

    Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.

    Let’s see what is the reason of such behavior and how we can improve the performance.

  • Amazon,  AWS,  RDS

    Sqoop Import from Amazon RDS Read Replica – ERROR: Canceling Statement due to Conflict with Recovery

    Apache Sqoop is widely used to import data from relational databases into cloud. One of our databases uses Amazon RDS for PostgreSQL to store sales data and its Sqoop import periodically failed with the following error:

    Error: java.io.IOException: SQLException in nextKeyValue
    Caused by: org.postgresql.util.PSQLException: ERROR: canceling statement due to conflict with recovery
      Detail: User query might have needed to see row versions that must be removed.
    

    In this article I will describe a solution that helped resolve the problem in our specific case.

  • Amazon,  EMR,  Spark

    Extremely Large Number of RDD Partitions and Tasks in Spark on Amazon EMR

    After creating an Amazon EMR cluster with Spark support, and running a spark application you can notice that the Spark job creates too many tasks to process even a very small data set.

    For example, I have a small table country_iso_codes having 249 rows and stored in a comma-delimited text file with the length of 10,657 bytes.

    When running the following application on Amazon EMR 5.7 cluster with Spark 2.1.1 with the default settings I can see the large number of partitions generated:

  • Amazon,  AWS,  EMR,  Hive,  I/O,  S3

    S3 Writes When Inserting Data into a Hive Table in Amazon EMR

    Often in an ETL process we move data from one source into another, typically doing some filtering, transformations and aggregations. Let’s consider which write operations are performed in S3.

    Just to focus on S3 writes I am going to use a very simple SQL INSERT statement just moving data from one table into another without any transformations as follows:

    INSERT OVERWRITE TABLE events PARTITION (event_dt = '2018-12-02', event_hour = '00')
    SELECT
      record_id,
      event_timestamp,
      event_name,
      app_name,
      country,
      city,
      payload
    FROM events_raw;
    
  • Amazon,  AWS,  EMR,  Hive,  ORC,  Tez

    Tez Internals #2 – Number of Map Tasks for Large ORC Files with Small Stripes in Amazon EMR

    Let’s see how Hive on Tez defines the number of map tasks when the input data is stored in large ORC files but having small stripes.

    Note. All experiments below were executed on Amazon Hive 2.1.1. This article does not apply to Qubole running on Amazon AWS. Qubole has a different algorithm to define the number of map tasks for ORC files.

  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring #4 – Read Operations and Tables

    Knowing how Hive table storage is organized can help us extract some additional information for S3 read operations for each table.

    In most cases (and you can easily adapt this for your specific table storage pattern), tables are stored in a S3 bucket under the following key structure:

    s3://<bucket_name>/hive/<database_name>/<table_name>/<partition1>/<partition2>/...
    

    For example, hourly data for orders table can be stored as follows:

    s3://cloudsqale/hive/sales.db/orders/created_dt=2018-10-18/hour=00/