Although Amazon S3 can generate a lot of logs and it makes sense to have an ETL process to parse, combine and put the logs into Parquet or ORC format for better query performance, there is still an easy way to analyze logs using a Hive table created just on top of the raw S3 log directory.
On one of the clusters I noticed an increased rate of shuffle errors, and the restart of a job did not help, it still failed with the same error.
The error was as follows:
Error: Error while running task ( failure ) : org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$ShuffleError: error in shuffle in Fetcher at org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle$RunShuffleCallable.callInternal (Shuffle.java:301) Caused by: java.io.IOException: Shuffle failed with too many fetch failures and insufficient progress!failureCounts=1, pendingInputs=1, fetcherHealthy=false, reducerProgressedEnough=true, reducerStalled=true
I have a Spark job that transforms incoming data from compressed text files into Parquet format and loads them into a daily partition of a Hive table. This is a typical job in a data lake, it is quite simple but in my case it was very slow.
Initially it took about 4 hours to convert ~2,100 input .gz files (~1.9 TB of data) into Parquet, while the actual Spark job took just 38 minutes to run and the remaining time was spent on loading data into a Hive partition.
Let’s see what is the reason of such behavior and how we can improve the performance.
I was asked to tune a Hive query that ran more than 10 hours. It was running on a 100 node cluster with 16 GB available for YARN containers on each node.
Although the query processed about 2 TB of input data, it did a fairly simple aggregation on
user_idcolumn and did not look too complex. It had 1 Map stage with 1,500 tasks and 1 Reduce stage with 7,000 tasks.
All map tasks completed within 30 minutes, and the query stuck on the Reduce phase. So what was wrong?
Presto is an extremely powerful distributed SQL query engine, so at some point you may consider using it to replace SQL-based ETL processes that you currently run on Apache Hive.
Although it is completely possible, you should be aware of some limitations that may affect your SLAs.
I was asked to diagnose and tune a long and complex ad-hoc Hive query that spent more than 4 hours on the reduce stage. The fetch from the map tasks and the merge phase completed fairly quickly (within 10 minutes) and the reducers spent most of their time iterating the input rows and performing the aggregations defined by the query – MIN, SUM, COUNT and PERCENTILE_APPROX and others on the specific columns.
After the merge phase a Tez reducer does not output many log records to help you diagnose the performance issues and find the bottlenecks. In this article I will describe how you can profile an already running Tez task without restarting the job.
Reducer aggregates rows within input groups (i.e. rows having the same grouping key) typically producing one output row per group. For example, the following query returns ~200 rows even if the source
eventstable has billions of rows:
SELECT country, count(*) FROM events GROUP BY country;
The problem is that despite the small number of output rows the aggregation still can be a very long process taking many hours.
Apache Tez is the main distributed execution engine for Apache Hive and Apache Pig jobs.
Tez represents a data flow as DAGs (Directed acyclic graph) that consists of a set of vertices connected by edges. Vertices represent data transformations while edges represent movement of data between vertices.
For example, the following Hive SQL query:
SELECT u.name, sum_items FROM ( SELECT user_id, SUM(items) sum_items FROM sales GROUP BY user_id ) s JOIN users u ON s.user_id = u.id
and its corresponding Apache Pig script:
sales = LOAD 'sales' USING org.apache.hive.hcatalog.pig.HCatLoader(); users = LOAD 'users' USING org.apache.hive.hcatalog.pig.HCatLoader(); sales_agg = FOREACH (GROUP sales BY user_id) GENERATE group.user_id as user_id, SUM(sales.items) as sum_items; data = JOIN sales_agg BY user_id, users BY id;
Can be represented as the following DAG in Tez:
In my case the job ran almost 5 hours:
Why did it take so long to run the job? Is there any way to improve its performance?
Memory allocation in Hadoop YARN clusters has some drawbacks that may lead to significant cluster under-utilization and at the same time (!) to large queues of pending applications.
So you have to pay for extra compute resources that you do not use and still have unsatisfied users. Let’s see how this can happen and how you can mitigate this.
Can reducing the Tez memory settings help solving memory limit problems? Sometimes this paradox works.
One day one of our Hive query failed with the following error:
Container is running beyond physical memory limits. Current usage: 4.1 GB of 4 GB physical memory used; 6.0 GB of 20 GB virtual memory used. Killing container.