• AWS,  EMR,  Hadoop,  YARN

    Amazon EMR – Recovering Ghost Nodes

    In a Hadoop cluster besides Active nodes you may also have Unhealthy, Lost and Decommissioned nodes. Unhealthy nodes are running but just excluded from scheduling the tasks because they, for example, do not have enough disk space. Lost nodes are nodes that are not reachable anymore. The decommissioned nodes are nodes that successfully terminated and left the cluster.

    But all these nodes are known to the Hadoop YARN cluster and you can see their details such as IP addresses, last health updates and so on.

    At the same time there can be also Ghost nodes i.e. nodes that are running by Amazon EMR services but Hadoop itself does not know anything about their existence. Let’s see how you can find them.

  • AWS,  EMR,  Hadoop,  YARN

    Hadoop YARN – Collecting Utilization Metrics from Multiple Clusters

    When you run many Hadoop clusters it is useful to automatically collect metrics from all clusters in a single place (Hive table i.e.).

    This allows you to perform any advanced and custom analysis of your clusters workload and not be limited to the features provided by Hadoop Administration UI tools that often offer only per cluster view so it is hard to see the whole picture of your data platform.

  • Amazon,  AWS,  EMR,  Hive,  I/O,  S3

    S3 Writes When Inserting Data into a Hive Table in Amazon EMR

    Often in an ETL process we move data from one source into another, typically doing some filtering, transformations and aggregations. Let’s consider which write operations are performed in S3.

    Just to focus on S3 writes I am going to use a very simple SQL INSERT statement just moving data from one table into another without any transformations as follows:

    INSERT OVERWRITE TABLE events PARTITION (event_dt = '2018-12-02', event_hour = '00')
    SELECT
      record_id,
      event_timestamp,
      event_name,
      app_name,
      country,
      city,
      payload
    FROM events_raw;
    
  • Amazon,  AWS,  EMR,  Hive,  ORC,  Tez

    Tez Internals #2 – Number of Map Tasks for Large ORC Files with Small Stripes in Amazon EMR

    Let’s see how Hive on Tez defines the number of map tasks when the input data is stored in large ORC files but having small stripes.

    Note. All experiments below were executed on Amazon Hive 2.1.1. This article does not apply to Qubole running on Amazon AWS. Qubole has a different algorithm to define the number of map tasks for ORC files.

  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring #4 – Read Operations and Tables

    Knowing how Hive table storage is organized can help us extract some additional information for S3 read operations for each table.

    In most cases (and you can easily adapt this for your specific table storage pattern), tables are stored in a S3 bucket under the following key structure:

    s3://<bucket_name>/hive/<database_name>/<table_name>/<partition1>/<partition2>/...
    

    For example, hourly data for orders table can be stored as follows:

    s3://cloudsqale/hive/sales.db/orders/created_dt=2018-10-18/hour=00/
    
  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring Step #3 – Read Operations and File Types

    After you get the summary information for S3 read operations (see Step #2), it makes sense to look at file types. Analyzing the object keys you can easily summarize information about compressed files such as .gz files.

    Later I will use the Hive metadata information to define whether files named like 00000_0 are uncompressed text or ORC files.

    select type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(total_time_ms/1000) time_spent,
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type;
    

    Here is my sample output:

    type keys dist_keys rate_mb_sec time_spent terabytes_read
    Compressed .gz 21,535,003 7,411,981 3.8 504,318,631 1,812.8
    Other 6,345,354 647,040 18.5 1,465,848 25.9

    File Types and Object Size Bins

    Now let’s see the distribution of file types for each size bin:

    select type, size_type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      case 
        when total_size <= 1024*1024 then '<= 1 MB'
        when total_size <= 30*1024*1024 then '<= 30 MB'
        when total_size <= 100*1024*1024 then '<= 100 MB'
        else '> 100 MB'
      end size_type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type, size_type;
    

    Sample output:

    type size_type keys dist_keys rate_mb_sec terabytes_read
    Compressed .gz <= 1 MB 7,759,230 3,579,785 5.2 2.4
    Compressed .gz <= 30 MB 6,927,405 2,456,010 4.6 47.3
    Compressed .gz <= 100 MB 1,136,926 436,463 3.7 71.1
    Compressed .gz > 100 MB 5,711,442 939,723 3.7 1,691.9
    Other <= 1 MB 2,535,108 496,286 3.2 0.2
    Other <= 30 MB 1,152,742 90,472 22.7 1.7
    Other <= 100 MB 150,521 7,119 14.7 1.0
    Other > 100 MB 2,506,983 53,191 19.4 23.0

    See also, S3 Monitoring Step #2 – Read Operations.