• Amazon,  AWS,  Hive,  I/O,  Parquet,  S3,  Spark

    Spark – Slow Load Into Partitioned Hive Table on S3 – Direct Writes, Output Committer Algorithms

    I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.

    Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.

    Let’s see what is the reason of such behavior and how we can improve the performance.

  • I/O,  Snowflake

    Snowflake – Micro-Partitions and Clustering Depth

    Traditional data warehouses require you to explicitly specify partition columns for tables using the PARTITION BY clause. There is no PARTITION BY clause in the CREATE TABLE statement in Snowflake although it still heavily relies on partitions.

    I already wrote about partitions in Snowflake (see, MIN/MAX Functions and Partition Pruning in Snowflake) but in this article I am going to investigate some more details.

  • I/O,  Snowflake,  Storage

    Snowflake – Monitoring Data Ingestion using QUERY_HISTORY and COPY_HISTORY – Single Large File vs Multiple Small Files

    Snowflake provides various options to monitor data ingestion from external storage such as Amazon S3. In this article I am going to review QUERY_HISTORY and COPY_HISTORY table functions.

    The COPY commands are widely used to move data into Snowflake on a time-interval basis, and we can monitor their execution accessing the query history with query_type = 'COPY' filter.

  • I/O,  Snowflake

    Snowflake – Remote Disk I/O, Local Disk Cache – Capacity, Utilization and Transfer Rate

    Snowflake uses a cloud storage service such as Amazon S3 as permanent storage for data (Remote Disk in terms of Snowflake), but it can also use Local Disk (SSD) to temporarily cache data used by SQL queries. Let’s test Remote and Local I/O performance by executing a sample SQL query multiple times on X-Large and Medium size Snowflake warehouses:

    SELECT MIN(event_hour), MAX(event_hour) FROM events WHERE event_name = 'LOGIN';
    

    Note that you should disable the Result Cache for queries in your session to perform such tests, otherwise Snowflake will just return the cached result immediately after the first attempt:

    alter session set USE_CACHED_RESULT = FALSE;
  • Amazon,  AWS,  EMR,  Hive,  I/O,  S3

    S3 Writes When Inserting Data into a Hive Table in Amazon EMR

    Often in an ETL process we move data from one source into another, typically doing some filtering, transformations and aggregations. Let’s consider which write operations are performed in S3.

    Just to focus on S3 writes I am going to use a very simple SQL INSERT statement just moving data from one table into another without any transformations as follows:

    INSERT OVERWRITE TABLE events PARTITION (event_dt = '2018-12-02', event_hour = '00')
    SELECT
      record_id,
      event_timestamp,
      event_name,
      app_name,
      country,
      city,
      payload
    FROM events_raw;
    
  • Hive,  I/O,  Tez

    Tez Internals #1 – Number of Map Tasks

    Let’s see how Apache Tez defines the number of map tasks when you execute a SQL query in Hive running on the Tez engine.

    Consider the following sample SQL query on a partitioned table:

    select count(*) from events where event_dt = '2018-10-20' and event_name = 'Started';
    

    The events table is partitioned by event_dt and event_name columns, and it is quite big – it has 209,146 partitions, while the query requests data from a single partition only:

    $ hive -e "show partitions events"
    
    event_dt=2017-09-22/event_name=ClientMetrics
    event_dt=2017-09-23/event_name=ClientMetrics
    ...
    event_dt=2018-10-20/event_name=Location
    ...
    event_dt=2018-10-20/event_name=Started
    
    Time taken: 26.457 seconds, Fetched: 209146 row(s)
    
  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring #4 – Read Operations and Tables

    Knowing how Hive table storage is organized can help us extract some additional information for S3 read operations for each table.

    In most cases (and you can easily adapt this for your specific table storage pattern), tables are stored in a S3 bucket under the following key structure:

    s3://<bucket_name>/hive/<database_name>/<table_name>/<partition1>/<partition2>/...
    

    For example, hourly data for orders table can be stored as follows:

    s3://cloudsqale/hive/sales.db/orders/created_dt=2018-10-18/hour=00/
    
  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring Step #3 – Read Operations and File Types

    After you get the summary information for S3 read operations (see Step #2), it makes sense to look at file types. Analyzing the object keys you can easily summarize information about compressed files such as .gz files.

    Later I will use the Hive metadata information to define whether files named like 00000_0 are uncompressed text or ORC files.

    select type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(total_time_ms/1000) time_spent,
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type;
    

    Here is my sample output:

    type keys dist_keys rate_mb_sec time_spent terabytes_read
    Compressed .gz 21,535,003 7,411,981 3.8 504,318,631 1,812.8
    Other 6,345,354 647,040 18.5 1,465,848 25.9

    File Types and Object Size Bins

    Now let’s see the distribution of file types for each size bin:

    select type, size_type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      case 
        when total_size <= 1024*1024 then '<= 1 MB'
        when total_size <= 30*1024*1024 then '<= 30 MB'
        when total_size <= 100*1024*1024 then '<= 100 MB'
        else '> 100 MB'
      end size_type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type, size_type;
    

    Sample output:

    type size_type keys dist_keys rate_mb_sec terabytes_read
    Compressed .gz <= 1 MB 7,759,230 3,579,785 5.2 2.4
    Compressed .gz <= 30 MB 6,927,405 2,456,010 4.6 47.3
    Compressed .gz <= 100 MB 1,136,926 436,463 3.7 71.1
    Compressed .gz > 100 MB 5,711,442 939,723 3.7 1,691.9
    Other <= 1 MB 2,535,108 496,286 3.2 0.2
    Other <= 30 MB 1,152,742 90,472 22.7 1.7
    Other <= 100 MB 150,521 7,119 14.7 1.0
    Other > 100 MB 2,506,983 53,191 19.4 23.0

    See also, S3 Monitoring Step #2 – Read Operations.

  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring Step #1 – Bucket Size and Number of Objects

    The first step in Amazon S3 monitoring is to check the current state of your S3 buckets and how fast they grow. You can easily get this information from the CloudWatch Management console, running a AWS CLI command or AWS SDK script.

    Bucket Size

    Here is an example of AWS CLI command to get the size of a bucket for every day within --start-time and --end-time date range:

    aws cloudwatch get-metric-statistics \
      --metric-name BucketSizeBytes --namespace AWS/S3 \
      --start-time 2018-10-01T00:00:00Z --end-time 2018-10-08T00:00:00Z \
      --statistics Maximum --unit Bytes --region us-east-1 \
      --dimensions Name=BucketName,Value=cloudsqale Name=StorageType,Value=StandardStorage \
      --period 86400 --query 'Datapoints[*].[Timestamp, Maximum]' \
      --output text | sort  | python cloudwatch_s3_metrics.py