By default, Spark EMR clusters have spark.dynamicAllocation.enabled
set to true
meaning that the cluster will dynamically allocate resources to scale the executors up and down whenever required.
But what is the initial number of executors when you start your Spark job?
Although Amazon EMR enables spark.dynamicAllocation.enabled
by default (it’s disabled in open-source Spark by default), it does not set default values neither for spark.dynamicAllocation.minExecutors
nor spark.dynamicAllocation.initialExecutors
that can lead to quite large number of executors started for the job initially.
For example, consider a cluster with the following settings in spark-default.conf
:
spark.executor.memory 18971M spark.executor.cores 4 spark.yarn.executor.memoryOverheadFactor 0.1875
When launching pyspark
with all default settings, I see that 50 executors were started immediately:
INFO Utils: Using initial executors = 50, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances INFO YarnAllocator: Will request 50 executor container(s), each with 4 core(s) and 22528 MB memory (including 3557 MB of overhead)
But even if I request less cores and specify the required number of executors explicitly by running pyspark --conf "spark.executor.instances=5" --conf "spark.executor.cores=2"
I can see even more executors were requested now:
INFO Utils: Using initial executors = 100, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances INFO YarnAllocator: Will request 100 executor container(s), each with 2 core(s) and 22528 MB memory (including 3557 MB of overhead)
And if I finally run pyspark --conf "spark.executor.instances=3" --conf "spark.executor.cores=1"
it requested 200 executors now:
INFO Utils: Using initial executors = 200, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances INFO YarnAllocator: Will request 200 executor container(s), each with 1 core(s) and 22528 MB memory (including 3557 MB of overhead)
So looks like with spark.dynamicAllocation.enabled
EMR tries to reach the total number of 200 cores no matter what value you set for spark.executor.instances
.
For some jobs it can be a quite large number and can consume a lot of resources preventing other jobs from running so you need to configure the dynamic allocation by setting spark.dynamicAllocation.minExecutors
and spark.dynamicAllocation.initialExecutors
options, for example:
spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.initialExecutors 1
Now if I run pyspark --conf "spark.executor.instances=3" --conf "spark.executor.cores=1"
I see that only 3 executors started as expected:
INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances INFO YarnAllocator: Will request 3 executor container(s), each with 1 core(s) and 22528 MB memory (including 3557 MB of overhead)
And if the job requires more resources more executors will allocated at run time.