Hive,  Tez

Apache Hive – Monitoring Progress of Long-Running Reducers – hive.log.every.n.records Option

Reducer aggregates rows within input groups (i.e. rows having the same grouping key) typically producing one output row per group. For example, the following query returns ~200 rows even if the source events table has billions of rows:

SELECT country, count(*) 
FROM events 
GROUP BY country;

The problem is that despite the small number of output rows the aggregation still can be a very long process taking many hours.

In my case a single reducer processed 21.9 M input rows (1.4 GB) but there were only 46,713 distinct keys that led to 46,713 output rows (12.4 MB):

REDUCE_INPUT_GROUPS 46,713
REDUCE_INPUT_RECORDS 21,958,450
SHUFFLE_BYTES 1,472,462,374
SHUFFLE_BYTES_DECOMPRESSED 8,157,541,247
RECORDS_OUT_0 46,713
HDFS_BYTES_WRITTEN 12,456,584

After reading and merging map outputs the reducer stuck processing rows. It took 3.5 hours to process 21.9 M rows and produced only six records written log messages:

2019-11-15 18:29:36,384 [INFO] [TezChild] |tez.ReduceRecordProcessor|: Starting Output: out_Reducer 2
2019-11-15 18:29:36,493 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1
2019-11-15 18:29:36,499 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 10
2019-11-15 18:29:38,794 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 100
2019-11-15 18:32:29,376 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1000
2019-11-15 19:33:46,380 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 10000
2019-11-15 22:03:06,105 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 46713
2019-11-15 22:03:06,129 [INFO] [TezChild] |exec.FileSinkOperator|: RECORDS_OUT_0:46713
2019-11-15 22:03:06,130 [INFO] [TezChild] |output.MROutput|: out_Reducer 2 closed
2019-11-15 22:03:06,132 [INFO] [TezChild] |task.TaskRunner2Callable|: Task completed

There were 2.5 hours when no any log records written (between 19:33 and 22:03 in my example).

So the reducer works but there are no any messages in log, and this is not very convenient. By default Reducer logs output rows in exponential intervals: 1, 10, 100, 1000, 10000, 100000 and so on.

When you have a large number of input rows but the small number of keys then the log records may appear rarely and the progress of the reducer is unknown.

To solve this issue, you can use Hive hive.log.every.n.records option to change the logging interval, for example:

set hive.log.every.n.records = 1000;

Now you can see more log records and better predict the estimated duration:

2019-11-16 11:19:15,700 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1
2019-11-16 11:30:24,773 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1001
2019-11-16 11:42:04,964 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 2001
2019-11-16 12:02:02,195 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 3001
2019-11-16 12:11:28,802 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 4001
2019-11-16 12:19:09,445 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 5001
2019-11-16 12:23:44,086 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 6001
2019-11-16 12:38:42,857 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 7001
2019-11-16 12:47:03,895 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 8001
...

You can see that it takes from 10 to 20 minutes to produce 1,000 output records (aggregations by reduce keys).

It depends on the size of each input group that can vary depending on the grouping key (for example, 1 M events for a country with large population, and 1 K events for a small country) as well as the calculations performed within a group that can also depend on the data itself rather than only the group size.