Let’s see how Apache Tez defines the number of map tasks when you execute a SQL query in Hive running on the Tez engine.
Consider the following sample SQL query on a partitioned table:
select count(*) from events where event_dt = '2018-10-20' and event_name = 'Started';
The events
table is partitioned by event_dt
and event_name
columns, and it is quite big – it has 209,146
partitions, while the query requests data from a single partition only:
$ hive -e "show partitions events" event_dt=2017-09-22/event_name=ClientMetrics event_dt=2017-09-23/event_name=ClientMetrics ... event_dt=2018-10-20/event_name=Location ... event_dt=2018-10-20/event_name=Started Time taken: 26.457 seconds, Fetched: 209146 row(s)
Partition Pruning and Input Paths
Before executing a query Hive tries to avoid reading unnecessary data. This process is called the partition pruning, it is performed in the Hive driver before launching any jobs in YARN, and you can see the following message in hive.log
:
2018-10-20T08:53:28,178 DEBUG PartitionPruner.java:prune Filter w/ compacting: ((event_dt = '2018-10-20') and (event_name = 'Started')) 2018-10-20T08:53:30,523 DEBUG GenMapRedUtils.java:setMapWork Adding s3://cloudsqale/hive/events.db/events/event_dt=2018-10-20/event_name=Started of table events
So you can see that only one directory was added as Input Path for the Tez job.
Input Splits
After input paths (list of directories containing input data files) for the job are defined, Tez defines input splits – the minimal fraction of data that must be processed by a single Map task (i.e. it cannot be further divided to be processed by multiple Map tasks).
Note that some data formats (for example, compressed .gz files) are not splittable, and no matter what is the size of the file (1 KB or 1 GB) it will always comprise only one split.
In my case, events
table contains data for raw events exported from an external data source, so each partition has a quite large number of relatively small .gz files:
$ aws s3 ls s3://cloudsqale/hive/events.db/event_dt=2018-10-20/event_name=Started/ --summarize 2018-10-20 04:41:01 51912591 00000.gz 2018-10-20 04:45:21 51896115 00001.gz 2018-10-20 04:42:35 52141658 00002.gz ... 2018-10-20 04:27:40 1115071 08644.gz 2018-10-20 04:27:26 2042 08645.gz Total Objects: 8646 Total Size: 180038369205
Tez engine performs input splits computation in the Application Master container.
By default Tez uses HiveInputFormat
(this is defined by hive.tez.input.format
option with the default value org.apache.hadoop.hive.ql.io.HiveInputFormat
) to generate input splits:
From the Application Master log:
[INFO] [InputInitializer|io.HiveInputFormat|: Generating splits for dirs: s3://cloudsqale/hive/events.db/event_dt=2018-10-20/event_name=Started [INFO] [InputInitializer]|mapred.FileInputFormat|: Total input paths to process : 8646 [INFO] [InputInitializer]|io.HiveInputFormat|: number of splits 8646 [INFO] [InputInitializer]|tez.HiveSplitGenerator|: Number of input splits: 8646. 479 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat
As expected the number of splits is equal to the number of .gz
files in the partition directory.
Grouping Input Splits
Tez does not create a Map task for every input split as it may lead to a very large number of Map tasks processing small volumes of data with significant overhead required to allocate and run YARN containers.
Instead Tez tries to group the input splits to process them with a smaller number of Map tasks.
In the log above, you can see 479 available slots
. Before running a job, Tez requests the available memory from YARN (headroom):
[INFO] rm.YarnTaskSchedulerService|: App total resource memory: 690176 cpu: 1 taskAllocations: 0
Tez AM also knows how much memory will be requested by every Tez container:
[INFO] impl.VertexImpl|:[Map 1], Default Resources=<memory:1440, vCores:1>
And 690176/1440
gives us 479 slots
i.e. the estimated number of YARN containers that are available right now and can be executed in parallel.
Split Waves
After defining the number of available slots, Tez uses the split-waves
multiplier (defined by the option tez.grouping.split-waves
with the default value 1.7) to define the estimated number of Map Tasks: 479*1.7
gives us 814
tasks:
[INFO] [InputInitializer] |tez.SplitGrouper|: Estimated number of tasks: 814 for bucket 1 [INFO] [InputInitializer] |grouper.TezSplitGrouper|: Desired numSplits: 814 lengthPerGroup: 221177357 totalLength: 180038369205 numOriginalSplits: 8646 . Grouping by length: true count: false
Actual Number of Tasks
So far we got the estimated number of tasks 814
based on the resource availability and the wave multiplier.
Tez uses this value to define lengthPerGroup
, the number of input bytes per task: totalLength/814 = 180038369205/814 = 221177357
Then Tez checks that lengthPerGroup
is within the range defined by tez.grouping.min-size
and tez.grouping.max-size
options (50 MB
and 1 GB
by default).
If lengthPerGroup
is less than tez.grouping.min-size
it is set to tez.grouping.min-size
. And if lengthPerGroup
is greater than tez.grouping.max-size
it is set to tez.grouping.max-size
.
As the next step Tez iterates input splits and combines them making sure the group size does not exceed lengthPerGroup
.
Since it is unlikely that groups will have exactly lengthPerGroup
(in our case 221177357
bytes), actual number of tasks can differ from the estimated number 814
:
[INFO] [InputInitializer] |grouper.TezSplitGrouper|: Number of splits desired: 814 created: 890 [INFO] [InputInitializer] |tez.SplitGrouper|: Original split count is 8646 grouped split count is 890 [INFO] [InputInitializer] |tez.HiveSplitGenerator|: Number of split groups: 890 ... [INFO] [App Shared Pool] |impl.VertexImpl|: Vertex [Map 1] parallelism set to 890
From Tez UI view:
So in our case 890 Map tasks instead of 814 was created to process 8646 input files.
More About Split Waves – Key to Dynamic Allocation and Better Parallelism
You may wonder why to create more tasks than available YARN resources (free right now, not total resources in the cluster)? In our sample there are 479 available slots while Tez created 890 tasks.
It gives more freedom to allocate resources dynamically: some other jobs may terminate and free resources (although the opposite is also true), or the cluster can auto scale. So even if not all Map tasks start immediately they can catch up soon.
Another reason is that some Map Tasks can run much slower (because of the container performance or due to specifics of processing logic in Map Task that can depend on processed data i.e.), and in this case it is better to have a larger number of Map tasks processing smaller volumes of data each.
This helps avoid situations when most Map tasks already ended while few others still running, and the total execution time for the SQL query is high.
Why good values for tez.grouping.split-waves
are 1.7, 2.7, 3.7 i.e. not the whole numbers? The reason is that we want the final wave to be smaller and cover only slow tasks.