Hive,  Pig,  Tez

Apache Hive/Pig on Tez – Long Running Tasks and Their Failed Attempts – Analyzing Performance and Finding Bottlenecks (Insufficient Parallelism) using Application Master Logs

Apache Tez is the main distributed execution engine for Apache Hive and Apache Pig jobs.

Tez represents a data flow as DAGs (Directed acyclic graph) that consists of a set of vertices connected by edges. Vertices represent data transformations while edges represent movement of data between vertices.

For example, the following Hive SQL query:

SELECT u.name, sum_items 
FROM
 (
   SELECT user_id, SUM(items) sum_items
   FROM sales
   GROUP BY user_id
 ) s
 JOIN
   users u
 ON s.user_id = u.id

and its corresponding Apache Pig script:

sales = LOAD 'sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
users = LOAD 'users' USING org.apache.hive.hcatalog.pig.HCatLoader();

sales_agg = FOREACH (GROUP sales BY user_id)
  GENERATE 
    group.user_id as user_id,
    SUM(sales.items) as sum_items;
    
data = JOIN sales_agg BY user_id, users BY id;

Can be represented as the following DAG in Tez:

In my case the job ran almost 5 hours:

Why did it take so long to run the job? Is there any way to improve its performance?

Tez Application Master Log

I am going to use the Tez AM log to investigate vertex performance and find possible bottlenecks.

Note that there is the Timeline Server REST API that you can use to get the statistics for Tez jobs, but the application master log is “event-driven”, shows the exact order of all events and contains much more details in general.

Since a single AM can run multiple DAGs in one session let’s find the beginning of our job:

2019-10-28 02:57:30,590 [INFO] |app.DAGAppMaster|: Running DAG
2019-10-28 02:57:30,831 [INFO] |history.HistoryEventHandler|: [HISTORY][DAG:dag_1571947206222_4487_1]
[Event:DAG_SUBMITTED]

Tez is ready to run vertices (but it is not physical execution yet):

2019-10-28 02:57:30,864 [INFO] |impl.VertexImpl|: Running vertex: vertex_1571947206222_4487_1_00
 [scope-286]

2019-10-28 02:57:30,866 [INFO] |impl.VertexImpl|: Running vertex: vertex_1571947206222_4487_1_01 
 [scope-287]

2019-10-28 02:57:30,869 [INFO] |impl.VertexImpl|: Running vertex: vertex_1571947206222_4487_1_02
 [scope-288]

2019-10-28 02:57:30,870 [INFO] |impl.VertexImpl|: Running vertex: vertex_1571947206222_4487_1_03 
 [scope-289]

Tez creates tasks to read data from 2 data sources (tables sales and users in my case), tasks for GROUP BY and tasks for JOIN:

2019-10-28 02:57:30,890 [INFO] |impl.VertexImpl|: Creating 200 tasks for vertex:
  vertex_1571947206222_4487_1_00 [scope-286]

2019-10-28 02:57:30,918 [INFO] |impl.VertexImpl|: Creating 400 tasks for vertex:
  vertex_1571947206222_4487_1_01 [scope-287]

2019-10-28 02:57:30,935 [INFO] |impl.VertexImpl|: Creating 200 tasks for vertex: 
  vertex_1571947206222_4487_1_02 [scope-288]

2019-10-28 02:57:30,944 [INFO] |impl.VertexImpl|: Creating 200 tasks for vertex:
  vertex_1571947206222_4487_1_03 [scope-289]

Then Tez starts the tasks (again, it is not physical execution yet):

2019-10-28 02:57:32,660 [INFO] |history.HistoryEventHandler|: 
  [Event:TASK_STARTED]: vertexName=scope-287, taskId=task_1571947206222_4487_1_01_000000

2019-10-28 02:57:32,698 [INFO] |history.HistoryEventHandler|:  
  [Event:TASK_STARTED]: vertexName=scope-286, taskId=task_1571947206222_4487_1_00_000000

2019-10-28 02:57:32,712 [INFO] |history.HistoryEventHandler|:  
  [Event:TASK_STARTED]: vertexName=scope-287, taskId=task_1571947206222_4487_1_01_000001
...

Note that tasks for different vertices can run concurrently. TASK_STARTED event is just the “logical” start of the task, and YARN containers are not allocated yet to physically run them on the compute nodes of the cluster.

Since there are a lot of tasks, I ran a script to parse and collect some stats for the TASK_STARTED events:

Tasks Started:

scope-286: 200 tasks
  Minimum timestamp: 2019-10-28 02:57:32,698
  Maximum timestamp: 2019-10-28 02:57:32,741

scope-287: 400 tasks
  Minimum timestamp: 2019-10-28 02:57:32,660
  Maximum timestamp: 2019-10-28 02:57:32,750

scope-288: 200 tasks
  Minimum timestamp: 2019-10-28 02:59:55,443
  Maximum timestamp: 2019-10-28 02:59:58,895

scope-289: 200 tasks
  Minimum timestamp: 2019-10-28 03:56:49,739
  Maximum timestamp: 2019-10-28 04:00:57,249

You can see all scope-286 and scope-287 tasks were started immediately.

As the next step Tez tries to allocate resources to run the tasks:

2019-10-28 02:57:32,791 [INFO] |rm.YarnTaskSchedulerService|: Allocation request for task:
  attempt_1571947206222_4487_1_01_000000_0 with request: Capability[<memory:1479, vCores:1>]

2019-10-28 02:57:32,791 [INFO] |rm.YarnTaskSchedulerService|: Allocation request for task: 
  attempt_1571947206222_4487_1_00_000000_0 with request: Capability[<memory:1479, vCores:1>]

2019-10-28 02:57:32,791 [INFO] |rm.YarnTaskSchedulerService|: Allocation request for task:
  attempt_1571947206222_4487_1_01_000001_0 with request: Capability[<memory:1479, vCores:1>]
...

Parsing the allocation request events I was able to collect the following stats:

Allocation Requests:

vertex_1571947206222_4487_1_00: 200 requests
  Minimum timestamp: 2019-10-28 02:57:32,791
  Maximum timestamp: 2019-10-28 02:57:32,822

vertex_1571947206222_4487_1_01: 400 requests
  Minimum timestamp: 2019-10-28 02:57:32,791
  Maximum timestamp: 2019-10-28 02:57:32,831

vertex_1571947206222_4487_1_02: 205 requests
  Minimum timestamp: 2019-10-28 02:59:55,462
  Maximum timestamp: 2019-10-28 03:09:42,694

vertex_1571947206222_4487_1_03: 240 requests
  Minimum timestamp: 2019-10-28 03:56:49,755
  Maximum timestamp: 2019-10-28 06:15:46,143

After the Tez application master receives containers from YARN, you can see the following messages:

2019-10-28 02:57:34,676 [INFO] |rm.YarnTaskSchedulerService|: Assigning container to task: 
  containerId=container_1571947206222_4487_01_000002, task=attempt_1571947206222_4487_1_00_000000_0,
  containerHost=ip-10-84-55-173.ec2.internal:8041, containerResources=<memory:1504, vCores:1>

2019-10-28 02:57:34,677 [INFO] |rm.YarnTaskSchedulerService|: Assigning container to task: 
  containerId=container_1571947206222_4487_01_000032, task=attempt_1571947206222_4487_1_00_000001_0,
  containerHost=ip-10-84-55-226.ec2.internal:8041, containerResources=<memory:1504, vCores:1>

2019-10-28 02:57:34,678 [INFO] |rm.YarnTaskSchedulerService|: Assigning container to task:
  containerId=container_1571947206222_4487_01_000017, task=attempt_1571947206222_4487_1_00_000002_0,
  containerHost=ip-10-84-55-209.ec2.internal:8041, containerResources=<memory:1504, vCores:1>
...

Summary on the assigned container events:

Assigned Containers:

vertex_1571947206222_4487_1_00: 200 containers
  Minimum timestamp: 2019-10-28 02:57:34,676
  Maximum timestamp: 2019-10-28 02:57:34,706

vertex_1571947206222_4487_1_01: 400 containers
  Minimum timestamp: 2019-10-28 02:57:34,706
  Maximum timestamp: 2019-10-28 02:57:36,347

vertex_1571947206222_4487_1_02: 205 containers
  Minimum timestamp: 2019-10-28 02:59:57,349
  Maximum timestamp: 2019-10-28 03:09:44,187

vertex_1571947206222_4487_1_03: 240 containers
  Minimum timestamp: 2019-10-28 03:56:50,820
  Maximum timestamp: 2019-10-28 06:15:47,634

Tez spends some time to launch the containers and you can see the following message when a task actually starts:

2019-10-28 02:57:35,951 [INFO] |history.HistoryEventHandler|: [HISTORY][DAG:dag_1571947206222_4487_1] 
 [Event:TASK_ATTEMPT_STARTED]: vertexName=scope-287,   
 taskAttemptId=attempt_1571947206222_4487_1_01_000039_0, 
 containerId=container_1571947206222_4487_01_000225, nodeId=ip-10-84-55-197.ec2.internal:8041
...

Summary on the attempt start events:

Task Attempts Started:

scope-286: 200 attempts
  Minimum timestamp: 2019-10-28 02:57:36,533
  Maximum timestamp: 2019-10-28 02:57:38,235

scope-287: 400 attempts
  Minimum timestamp: 2019-10-28 02:57:35,951
  Maximum timestamp: 2019-10-28 02:57:38,287

scope-288: 205 attempts
  Minimum timestamp: 2019-10-28 02:59:58,272
  Maximum timestamp: 2019-10-28 03:09:45,134

scope-289: 240 attempts
  Minimum timestamp: 2019-10-28 03:56:51,791
  Maximum timestamp: 2019-10-28 06:15:48,567

When a task attempt finished the following event shows its execution statistics:

2019-10-28 02:59:51,587 [INFO] |history.HistoryEventHandler|: [HISTORY][DAG:dag_1571947206222_4487_1]
  [Event:TASK_FINISHED]: vertexName=scope-287, taskId=task_1571947206222_4487_1_01_000003, 
  startTime=1572231457104, finishTime=1572231591585, timeTaken=134481, status=SUCCEEDED, 
  successfulAttemptID=attempt_1571947206222_4487_1_01_000003_0, 
  FILE_BYTES_READ=894396137, FILE_BYTES_WRITTEN=1667278775, S3N_BYTES_READ=531118188, 
  INPUT_RECORDS_PROCESSED=12899158, INPUT_SPLIT_LENGTH_BYTES=531118188, OUTPUT_RECORDS=12899158, 
  OUTPUT_BYTES=2119682165, OUTPUT_BYTES_WITH_OVERHEAD=2024479834, OUTPUT_BYTES_PHYSICAL=833233007
...

Summary for the finished tasks:

Tasks Finished:

scope-286: 200 tasks
  Minimum timestamp: 2019-10-28 03:54:00,943
  Maximum timestamp: 2019-10-28 04:16:05,362

scope-287: 400 tasks
  Minimum timestamp: 2019-10-28 02:59:51,587
  Maximum timestamp: 2019-10-28 03:00:14,312

scope-288: 200 tasks
  Minimum timestamp: 2019-10-28 03:08:35,733
  Maximum timestamp: 2019-10-28 03:10:33,184

scope-289: 200 tasks
  Minimum timestamp: 2019-10-28 06:01:53,947
  Maximum timestamp: 2019-10-28 07:54:44,148

You can notice that there are about 2 hours between the first and last finished task in scope-289 (JOIN operation) and 20 minutes in scope-286 (GROUP BY operation).

Below you can see an approximate timeline of execution for each vertex:

The final JOIN (scope-289) starts when the first task of scope-286 finishes (not all), but it still has to wait an hour to start.

Tasks Execution Time

Next, it makes sense to analyze the execution time (minimum, maximum and percentiles) for tasks to find skews or anomalies:

scope-286: 
  Minimum: 3383.5 sec (56.3 min)
  Maximum: 4707.5 sec (78.4 min)
  p01: 3402.0 sec (56.7 min)
  p05: 3452.6 sec (57.5 min)
  p25: 3551.9 sec (59.1 min)
  p50: 3676.9 sec (61.2 min)
  p75: 3799.6 sec (63.3 min)
  p95: 3957.2 sec (65.9 min)
  p99: 4546.4 sec (75.7 min)

scope-287: 
  Minimum: 134.4 sec (2.2 min)
  Maximum: 156.7 sec (2.6 min)
  p01: 135.1 sec (2.2 min)
  p05: 136.4 sec (2.2 min)
  p25: 137.8 sec (2.2 min)
  p50: 139.5 sec (2.3 min)
  p75: 141.5 sec (2.3 min)
  p95: 144.3 sec (2.4 min)
  p99: 148.0 sec (2.4 min)

scope-288: 
  Minimum: 516.7 sec (8.6 min)
  Maximum: 633.3 sec (10.5 min)
  p01: 517.4 sec (8.6 min)
  p05: 523.7 sec (8.7 min)
  p25: 531.9 sec (8.8 min)
  p50: 538.6 sec (8.9 min)
  p75: 545.0 sec (9.0 min)
  p95: 556.1 sec (9.2 min)
  p99: 564.5 sec (9.4 min)

scope-289:
  Minimum: 7388.2 sec (123.1 min)
  Maximum: 14064.9 sec (234.4 min)
  p01: 7465.0 sec (124.4 min)
  p05: 7617.8 sec (126.9 min)
  p25: 7823.5 sec (130.3 min)
  p50: 7935.8 sec (132.2 min)
  p75: 8036.4 sec (133.9 min)
  p95: 8192.3 sec (136.5 min)
  p99: 8433.8 sec (140.5 min)

Long Running Map Tasks

scope-286 and scope-287 are map tasks that read the input data. While scope-287 looks good (2 minutes per task without significant variance) scope-286 tasks run from 56 to 78 minutes that is quite long for mappers.

Why is long running task a problem?

  • If a 1-hour task fails (failure is normal) you need one more hour to restart it that significantly delays the downstream data pipeline and just adds 1 hour to the job execution time (!)

  • When the cluster has enough resources it can parallelize the processing at the task level but this is not possible when tasks are large (task is an atomic unit of work and cannot be split into smaller chunks).

First let’s see the input size for map tasks:

scope-286: 
  Total inputSplitSize: 1007598.6 MB (1007.5 GB)
  Minimum inputSplitSize: 5008.3 MB (5.0 GB)
  Maximum inputSplitSize: 5071.6 MB (5.0 GB)
  p01 inputSplitSize: 5016.9 MB (5.0 GB)
  p05 inputSplitSize: 5022.0 MB (5.0 GB)
  p25 inputSplitSize: 5031.0 MB (5.0 GB)
  p50 inputSplitSize: 5038.0 MB (5.0 GB)
  p75 inputSplitSize: 5044.3 MB (5.0 GB)
  p95 inputSplitSize: 5054.1 MB (5.0 GB)
  p99 inputSplitSize: 5064.6 MB (5.0 GB)

scope-287: 
  Total inputSplitSize: 212296.8 MB (212.2 GB)
  Minimum inputSplitSize: 530.3 MB (0.5 GB)
  Maximum inputSplitSize: 531.1 MB (0.5 GB)
  p01 inputSplitSize: 530.4 MB (0.5 GB)
  p05 inputSplitSize: 530.5 MB (0.5 GB)
  p25 inputSplitSize: 530.6 MB (0.5 GB)
  p50 inputSplitSize: 530.7 MB (0.5 GB)
  p75 inputSplitSize: 530.8 MB (0.5 GB)
  p95 inputSplitSize: 531.0 MB (0.5 GB)
  p99 inputSplitSize: 531.1 MB (0.5 GB)

Input processing rate:

scope-286: 200 tasks
  Total inputSplitSize: 1007598.6 MB (1007.5 GB)
  Minimum start time: 2019-10-28 02:57:36,533
  Maximum end time: 2019-10-28 04:16:05,362
  Duration: 4708.8 sec (78.4 min)
  Total inputSplitSize/sec: 213.9 MB/sec
  Minimum inputSplitSize/sec: 1.0 MB/sec
  Maximum inputSplitSize/sec: 1.4 MB/sec
  p01 inputSplitSize/sec: 1.1 MB/sec
  p05 inputSplitSize/sec: 1.2 MB/sec
  p25 inputSplitSize/sec: 1.3 MB/sec
  p50 inputSplitSize/sec: 1.3 MB/sec
  p75 inputSplitSize/sec: 1.4 MB/sec
  p95 inputSplitSize/sec: 1.4 MB/sec
  p99 inputSplitSize/sec: 1.4 MB/sec

scope-287: 400 tasks
  Total inputSplitSize: 212296.8 MB (212.2 GB)
  Minimum start time: 2019-10-28 02:57:35,951
  Maximum end time: 2019-10-28 03:00:14,312
  Duration: 158.3 sec (2.6 min)
  Total inputSplitSize/sec: 1340.5 MB/sec
  Minimum inputSplitSize/sec: 3.3 MB/sec
  Maximum inputSplitSize/sec: 3.9 MB/sec
  p01 inputSplitSize/sec: 3.5 MB/sec
  p05 inputSplitSize/sec: 3.6 MB/sec
  p25 inputSplitSize/sec: 3.7 MB/sec
  p50 inputSplitSize/sec: 3.8 MB/sec
  p75 inputSplitSize/sec: 3.8 MB/sec
  p95 inputSplitSize/sec: 3.8 MB/sec
  p99 inputSplitSize/sec: 3.9 MB/sec

Output size for map tasks:

scope-286: 200 tasks
  Total outputSize: 2956830.1 MB (2956.8 GB)
  Minimum outputSize: 14697.9 MB (14.6 GB)
  Maximum outputSize: 14883.0 MB (14.8 GB)
  p01 outputSize: 14720.4 MB (14.7 GB)
  p05 outputSize: 14736.0 MB (14.7 GB)
  p25 outputSize: 14762.1 MB (14.7 GB)
  p50 outputSize: 14783.9 MB (14.7 GB)
  p75 outputSize: 14805.1 MB (14.8 GB)
  p95 outputSize: 14831.3 MB (14.8 GB)
  p99 outputSize: 14862.5 MB (14.8 GB)

scope-287: 400 tasks
  Total outputSize: 333020.8 MB (333.0 GB)
  Minimum outputSize: 831.8 MB (0.8 GB)
  Maximum outputSize: 833.2 MB (0.8 GB)
  p01 outputSize: 832.0 MB (0.8 GB)
  p05 outputSize: 832.1 MB (0.8 GB)
  p25 outputSize: 832.4 MB (0.8 GB)
  p50 outputSize: 832.5 MB (0.8 GB)
  p75 outputSize: 832.6 MB (0.8 GB)
  p95 outputSize: 832.9 MB (0.8 GB)
  p99 outputSize: 833.1 MB (0.8 GB)

You can see that there are issues with scope-286, its task inputs are huge (5 GB per task), and the computation is intensive producing even much larger resulting data (14 GB per task).

Analyzing Reducers

Now let’s collect some statistics for reducers performing GROUP BY and JOIN in my example:

scope-288: 200 tasks
  Total shuffleSize: 333020.8 MB (333.0 GB)
  Minimum shuffleSize: 1651.6 MB (1.6 GB)
  Maximum shuffleSize: 1678.0 MB (1.6 GB)
  p01 shuffleSize: 1653.0 MB (1.6 GB)
  p05 shuffleSize: 1655.6 MB (1.6 GB)
  p25 shuffleSize: 1661.8 MB (1.6 GB)
  p50 shuffleSize: 1664.8 MB (1.6 GB)
  p75 shuffleSize: 1668.7 MB (1.6 GB)
  p95 shuffleSize: 1674.0 MB (1.6 GB)
  p99 shuffleSize: 1677.0 MB (1.6 GB)

scope-289: 200 tasks
  Total shuffleSize: 2963183.0 MB (2963.1 GB)
  Minimum shuffleSize: 14729.5 MB (14.7 GB)
  Maximum shuffleSize: 14915.0 MB (14.9 GB)
  p01 shuffleSize: 14751.9 MB (14.7 GB)
  p05 shuffleSize: 14767.8 MB (14.7 GB)
  p25 shuffleSize: 14793.7 MB (14.7 GB)
  p50 shuffleSize: 14815.6 MB (14.8 GB)
  p75 shuffleSize: 14837.0 MB (14.8 GB)
  p95 shuffleSize: 14863.1 MB (14.8 GB)
  p99 shuffleSize: 14894.4 MB (14.8 GB)

JOIN in scope-289 is well-balanced but it just has to process too large volume of data (2.9 TB) for 200 tasks.

Failed Reducer Attempts

One of reduce tasks failed after more than 2 hours of execution:

2019-10-28 04:00:17,063 [INFO] |history.HistoryEventHandler|: 
  [Event:TASK_STARTED]: vertexName=scope-289, taskId=task_1571947206222_4487_1_03_000101

2019-10-28 06:11:49,734 [INFO] |history.HistoryEventHandler|: 
  [Event:TASK_ATTEMPT_FINISHED]: vertexName=scope-289, 
  taskAttemptId=attempt_1571947206222_4487_1_03_000101_0, 
  diagnostics=Error: Error while running task ( failure ) : 
    java.io.IOException: Error closing multipart upload

Tez automatically started a new attempt, but this failure added 2 hours to the job execution time (!):

2019-10-28 06:11:49,735 [INFO] |impl.TaskImpl|: Scheduling new attempt for task: 
  task_1571947206222_4487_1_03_000101, currentFailedAttempts: 1, maxFailedAttempts: 4

2019-10-28 07:54:44,148 [INFO] |history.HistoryEventHandler|: 
  [Event:TASK_ATTEMPT_FINISHED]: vertexName=scope-289, 
  taskAttemptId=attempt_1571947206222_4487_1_03_000101_1, status=SUCCEEDED

The task that could finish at 6:11 finished at 7:54 due to the failure.

Increasing Parallelism

In my environment the cluster that runs this job has 300 nodes so I increased the number tasks for JOIN and scope-286 mapper to 2,000 and job takes only 25-30 minutes to run now (instead of 3-5 hours).

Conclusion

Distributed data pipelines often have huge potential to improve their performance. DAGs can be complex and you have to monitor the performance of every vertex to find long running tasks and try to increase the parallelism.

Failure is normal in distributed environments so even if you do not have enough resources to run all tasks concurrently it is still helpful to have large number of small tasks to minimize the delays introduced by task attempt re-runs.