• Amazon,  EMR,  Spark

    Extremely Large Number of RDD Partitions and Tasks in Spark on Amazon EMR

    After creating an Amazon EMR cluster with Spark support, and running a spark application you can notice that the Spark job creates too many tasks to process even a very small data set.

    For example, I have a small table country_iso_codes having 249 rows and stored in a comma-delimited text file with the length of 10,657 bytes.

    When running the following application on Amazon EMR 5.7 cluster with Spark 2.1.1 with the default settings I can see the large number of partitions generated:

  • Data Skew,  Distributed,  Pig,  Tez

    Reduce Number of Output Files for Skewed Data – ORDER in Apache Pig – Sampler and Weighted Range Partitioner to Balance Reducers

    One of our event tables is very large, it contains billions of rows per day. Since the analytics team is often interested in specific events only it makes sense to process the raw events and generate a partition for every event type. Then when a data analyst runs an ad-hoc query, it reads data for the required event type only increasing the query performance.

    The problem is that there are 3,000 map tasks are launched to read the daily data and there are 250 distinct event types, so the mappers will produce 3,000 * 250 = 750,000 files per day. That’s too much.

  • ORC,  Storage

    Storage Tuning for Mapped JSON Conversion to ORC File Format – Java Heap Issues with Dictionary Encoding

    Usually in a Data Lake we get source data as compressed JSON payloads (.gz files). Additionally, the first level of JSON objects is often parsed into map<string, string> structure to speed up the access to the first level keys/values, and then get_json_object function can be used to parse further JSON levels whenever required.

    But it still makes sense to convert data into the ORC format to evenly distribute data processing to smaller chunks, or to use indexes and optimize query execution for complementary columns such as event names, geo information, and some other system attributes.

    In this example we will load the source data stored in single 2.5 GB .gz file into the following ORC table:

  • Hive,  ORC,  Storage

    ORC File Format Internals – Creating Large Stripes in Hive Tables

    Usually the source data arrives as compressed text files, and the first step in an ETL process is to convert them to a columnar format for more effective query execution by users.

    Let’s consider a simple example when we have a single 120 MB source file in .gz format:

    $ aws s3 ls s3://cloudsqale/hive/events.db/events_raw/
    2018-12-16 18:49:45  120574494 data.gz

    and want to convert it into a Hive table with the ORC file format having 256 MB stripe size. Will 120 MB of .gz data be loaded into a single 256 MB stripe? Not so easy.

  • Amazon,  AWS,  EMR,  Hive,  I/O,  S3

    S3 Writes When Inserting Data into a Hive Table in Amazon EMR

    Often in an ETL process we move data from one source into another, typically doing some filtering, transformations and aggregations. Let’s consider which write operations are performed in S3.

    Just to focus on S3 writes I am going to use a very simple SQL INSERT statement just moving data from one table into another without any transformations as follows:

    INSERT OVERWRITE TABLE events PARTITION (event_dt = '2018-12-02', event_hour = '00')
    FROM events_raw;
  • Amazon,  AWS,  EMR,  Hive,  ORC,  Tez

    Tez Internals #2 – Number of Map Tasks for Large ORC Files with Small Stripes in Amazon EMR

    Let’s see how Hive on Tez defines the number of map tasks when the input data is stored in large ORC files but having small stripes.

    Note. All experiments below were executed on Amazon Hive 2.1.1. This article does not apply to Qubole running on Amazon AWS. Qubole has a different algorithm to define the number of map tasks for ORC files.

  • Hive,  I/O,  Tez

    Tez Internals #1 – Number of Map Tasks

    Let’s see how Apache Tez defines the number of map tasks when you execute a SQL query in Hive running on the Tez engine.

    Consider the following sample SQL query on a partitioned table:

    select count(*) from events where event_dt = '2018-10-20' and event_name = 'Started';

    The events table is partitioned by event_dt and event_name columns, and it is quite big – it has 209,146 partitions, while the query requests data from a single partition only:

    $ hive -e "show partitions events"
    Time taken: 26.457 seconds, Fetched: 209146 row(s)