Reducer aggregates rows within input groups (i.e. rows having the same grouping key) typically producing one output row per group. For example, the following query returns ~200 rows even if the source events
table has billions of rows:
SELECT country, count(*) FROM events GROUP BY country;
The problem is that despite the small number of output rows the aggregation still can be a very long process taking many hours.
In my case a single reducer processed 21.9 M input rows (1.4 GB) but there were only 46,713 distinct keys that led to 46,713 output rows (12.4 MB):
REDUCE_INPUT_GROUPS | 46,713 |
REDUCE_INPUT_RECORDS | 21,958,450 |
SHUFFLE_BYTES | 1,472,462,374 |
SHUFFLE_BYTES_DECOMPRESSED | 8,157,541,247 |
RECORDS_OUT_0 | 46,713 |
HDFS_BYTES_WRITTEN | 12,456,584 |
After reading and merging map outputs the reducer stuck processing rows. It took 3.5 hours to process 21.9 M rows and produced only six records written
log messages:
2019-11-15 18:29:36,384 [INFO] [TezChild] |tez.ReduceRecordProcessor|: Starting Output: out_Reducer 2 2019-11-15 18:29:36,493 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1 2019-11-15 18:29:36,499 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 10 2019-11-15 18:29:38,794 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 100 2019-11-15 18:32:29,376 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1000 2019-11-15 19:33:46,380 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 10000 2019-11-15 22:03:06,105 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 46713 2019-11-15 22:03:06,129 [INFO] [TezChild] |exec.FileSinkOperator|: RECORDS_OUT_0:46713 2019-11-15 22:03:06,130 [INFO] [TezChild] |output.MROutput|: out_Reducer 2 closed 2019-11-15 22:03:06,132 [INFO] [TezChild] |task.TaskRunner2Callable|: Task completed
There were 2.5 hours when no any log records written (between 19:33 and 22:03 in my example).
So the reducer works but there are no any messages in log, and this is not very convenient. By default Reducer logs output rows in exponential intervals: 1, 10, 100, 1000, 10000, 100000 and so on.
When you have a large number of input rows but the small number of keys then the log records may appear rarely and the progress of the reducer is unknown.
To solve this issue, you can use Hive hive.log.every.n.records
option to change the logging interval, for example:
set hive.log.every.n.records = 1000;
Now you can see more log records and better predict the estimated duration:
2019-11-16 11:19:15,700 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1 2019-11-16 11:30:24,773 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 1001 2019-11-16 11:42:04,964 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 2001 2019-11-16 12:02:02,195 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 3001 2019-11-16 12:11:28,802 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 4001 2019-11-16 12:19:09,445 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 5001 2019-11-16 12:23:44,086 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 6001 2019-11-16 12:38:42,857 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 7001 2019-11-16 12:47:03,895 [INFO] [TezChild] |exec.FileSinkOperator|: FS[14]: records written - 8001 ...
You can see that it takes from 10 to 20 minutes to produce 1,000 output records (aggregations by reduce keys).
It depends on the size of each input group that can vary depending on the grouping key (for example, 1 M events for a country with large population, and 1 K events for a small country) as well as the calculations performed within a group that can also depend on the data itself rather than only the group size.