• I/O,  Parquet,  Storage

    How Map Column is Written to Parquet – Converting JSON to Map to Increase Read Performance

    It is quite common today to convert incoming JSON data into Parquet format to improve the performance of analytical queries.

    When JSON data has an arbitrary schema i.e. different records can contain different key-value pairs, it is common to parse such JSON payloads into a map column in Parquet.

    How is it stored? What read performance can you expect? Will json_map["key"] read only data for key or the entire JSON?

  • I/O,  Snowflake,  Storage

    Snowflake – Monitoring Data Ingestion using QUERY_HISTORY and COPY_HISTORY – Single Large File vs Multiple Small Files

    Snowflake provides various options to monitor data ingestion from external storage such as Amazon S3. In this article I am going to review QUERY_HISTORY and COPY_HISTORY table functions.

    The COPY commands are widely used to move data into Snowflake on a time-interval basis, and we can monitor their execution accessing the query history with query_type = 'COPY' filter.

  • ORC,  Storage

    Storage Tuning for Mapped JSON Conversion to ORC File Format – Java Heap Issues with Dictionary Encoding

    Usually in a Data Lake we get source data as compressed JSON payloads (.gz files). Additionally, the first level of JSON objects is often parsed into map<string, string> structure to speed up the access to the first level keys/values, and then get_json_object function can be used to parse further JSON levels whenever required.

    But it still makes sense to convert data into the ORC format to evenly distribute data processing to smaller chunks, or to use indexes and optimize query execution for complementary columns such as event names, geo information, and some other system attributes.

    In this example we will load the source data stored in single 2.5 GB .gz file into the following ORC table:

  • Hive,  ORC,  Storage

    ORC File Format Internals – Creating Large Stripes in Hive Tables

    Usually the source data arrives as compressed text files, and the first step in an ETL process is to convert them to a columnar format for more effective query execution by users.

    Let’s consider a simple example when we have a single 120 MB source file in .gz format:

    $ aws s3 ls s3://cloudsqale/hive/events.db/events_raw/
    2018-12-16 18:49:45  120574494 data.gz
    

    and want to convert it into a Hive table with the ORC file format having 256 MB stripe size. Will 120 MB of .gz data be loaded into a single 256 MB stripe? Not so easy.

  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring #4 – Read Operations and Tables

    Knowing how Hive table storage is organized can help us extract some additional information for S3 read operations for each table.

    In most cases (and you can easily adapt this for your specific table storage pattern), tables are stored in a S3 bucket under the following key structure:

    s3://<bucket_name>/hive/<database_name>/<table_name>/<partition1>/<partition2>/...
    

    For example, hourly data for orders table can be stored as follows:

    s3://cloudsqale/hive/sales.db/orders/created_dt=2018-10-18/hour=00/
    
  • Amazon,  AWS,  I/O,  Monitoring,  S3,  Storage

    S3 Monitoring Step #3 – Read Operations and File Types

    After you get the summary information for S3 read operations (see Step #2), it makes sense to look at file types. Analyzing the object keys you can easily summarize information about compressed files such as .gz files.

    Later I will use the Hive metadata information to define whether files named like 00000_0 are uncompressed text or ORC files.

    select type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(total_time_ms/1000) time_spent,
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type;
    

    Here is my sample output:

    type keys dist_keys rate_mb_sec time_spent terabytes_read
    Compressed .gz 21,535,003 7,411,981 3.8 504,318,631 1,812.8
    Other 6,345,354 647,040 18.5 1,465,848 25.9

    File Types and Object Size Bins

    Now let’s see the distribution of file types for each size bin:

    select type, size_type, count(*) keys, count(distinct key) dist_keys, 
      sum(bytes_sent)/sum(total_time_ms/1000)/(1024*1024) rate_mb_sec, 
      sum(bytes_sent)/(cast(1024 as bigint)*1024*1024*1024) terabytes_read
    from (
    select 
      key,
      case 
        when key like '%.gz' then 'Compressed .gz'
        else 'Other'
      end type,
      case 
        when total_size <= 1024*1024 then '<= 1 MB'
        when total_size <= 30*1024*1024 then '<= 30 MB'
        when total_size <= 100*1024*1024 then '<= 100 MB'
        else '> 100 MB'
      end size_type,
      bytes_sent,
      total_time_ms
    from s3_access_logs 
    where event_dt ='{$EVENT_DT}' and operation='REST.GET.OBJECT') t
    group by type, size_type;
    

    Sample output:

    type size_type keys dist_keys rate_mb_sec terabytes_read
    Compressed .gz <= 1 MB 7,759,230 3,579,785 5.2 2.4
    Compressed .gz <= 30 MB 6,927,405 2,456,010 4.6 47.3
    Compressed .gz <= 100 MB 1,136,926 436,463 3.7 71.1
    Compressed .gz > 100 MB 5,711,442 939,723 3.7 1,691.9
    Other <= 1 MB 2,535,108 496,286 3.2 0.2
    Other <= 30 MB 1,152,742 90,472 22.7 1.7
    Other <= 100 MB 150,521 7,119 14.7 1.0
    Other > 100 MB 2,506,983 53,191 19.4 23.0

    See also, S3 Monitoring Step #2 – Read Operations.