• Amazon,  AWS,  EMR,  Spark

    EMR Spark – Much Larger Executors are Created than Requested

    Starting from EMR 5.32 and EMR 6.2 you can notice that Spark can launch much larger executors that you request in your job settings. For example, EMR created my cluster with the following default settings (it depends on the instance type and maximizeResourceAllocation classification option):

      spark.executor.memory                      18971M
      spark.executor.cores                       4
      spark.yarn.executor.memoryOverheadFactor   0.1875

    But when I start a Spark session (pyspark command) I see the following:

  • Amazon,  AWS,  EMR,  S3,  Spark

    Amazon EMR Spark – Ignoring Partition Filter and Listing All Partitions When Reading from S3A

    I have a partitioned Hive table created by an open-source version of Hadoop that uses S3A scheme as the location for every partition. The table has more than 10,000 partitions and every partition has about 8,000 Parquet files:

    $ hive -e "show partitions events";
    $ hive -e "describe formatted events partition (dateint=20220419, hour='11')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=11
    $ hive -e "describe formatted events partition (dateint=20220419, hour='12')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=12
    $ hive -e "describe formatted events partition (dateint=20220419, hour='13')" | grep Location
    Location:   s3a://cloudsqale/events/dateint=20220419/hour=13

    S3A:// is specified for every partition in this table.

    Reading a Partition in Amazon EMR Spark

    When I made an attempt to read data from a single partition using Spark SQL:

    $ spark-sql --master yarn -e "select count(*) from events where dateint=20220419 and hour='11'"

    The Spark driver failed with:

    # java.lang.OutOfMemoryError: GC overhead limit exceeded
    # -XX:OnOutOfMemoryError="kill -9 %p"
    #   Executing /bin/sh -c "kill -9 4847"...
  • I/O,  Parquet,  Spark

    Spark – Reading Parquet – Why the Number of Tasks can be Much Larger than the Number of Row Groups

    A row group is a unit of work for reading from Parquet that cannot be split into smaller parts, and you expect that the number of tasks created by Spark is no more than the total number of row groups in your Parquet data source.

    But Spark still can create much more tasks than the number of row groups. Let’s see how this is possible.

  • I/O,  Parquet,  Spark

    Spark – Reading Parquet – Predicate Pushdown for LIKE Operator – EqualTo, StartsWith and Contains Pushed Filters

    A Parquet file contains MIN/MAX statistics for every column for every row group that allows Spark applications to skip reading unnecessary data chunks depending on the query predicate. Let’s see how this works with LIKE pattern matching filter.

    For my tests I will use a Parquet file with 4 row groups and the following MIN/MAX statistics for product column:

  • Parquet

    Parquet 1.x File Format – Footer Content

    Every Parquet file has the footer that contains metadata information: schema, row groups and column statistics. The footer is located at the end of the file.

    A parquet file content starts and ends with 4-byte PAR1 “magic” string. Right before the ending PAR1 there is 4-byte footer length size (little-endian encoding):

    The position of the footer can be easily calculated as: File_length - Footer_length - 4

  • AWS,  Flink,  S3

    Flink and S3 Entropy Injection for Checkpoints

    When you use S3 for storing checkpoints it can easily become a bottleneck especially for your Flink application with a lot of subtasks. To overcome this problem FLINK-9061 introduced an entropy ingestion to the checkpoint path.

    But the Flink documentation provides a misleading example (at least up to Flink 1.13) that actually destroys the value of the checkpoint entropy.

  • Hadoop,  YARN

    Hadoop YARN – Monitoring Resource Consumption by Running Applications in Multi-Cluster Environments

    In cloud it is typical to run multiple compute clusters, so browsing the Web UI for every cluster to check the current resource consumption by applications is not always easy and convenient especially if YARN clusters are managed by different Hadoop distributions (Amazon EMR, Cloudera, Qubole etc.).

    Let’s see how you can automate this process and find out how many applications are running and which resources they are consuming (containers, memory and CPU).

  • I/O,  Parquet,  Storage

    How Map Column is Written to Parquet – Converting JSON to Map to Increase Read Performance

    It is quite common today to convert incoming JSON data into Parquet format to improve the performance of analytical queries.

    When JSON data has an arbitrary schema i.e. different records can contain different key-value pairs, it is common to parse such JSON payloads into a map column in Parquet.

    How is it stored? What read performance can you expect? Will json_map["key"] read only data for key or the entire JSON?