After creating an Amazon EMR cluster with Spark support, and running a spark application you can notice that the Spark job creates too many tasks to process even a very small data set.
For example, I have a small table country_iso_codes
having 249 rows and stored in a comma-delimited text file with the length of 10,657 bytes.
When running the following application on Amazon EMR 5.7 cluster with Spark 2.1.1 with the default settings I can see the large number of partitions generated:
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spark_ap').enableHiveSupport().getOrCreate() df = spark.sql("select name from country_iso_codes") print("Number of partitions: " + str(df.rdd.getNumPartitions()))
Output:
Number of partitions: 3553
Looking at the driver log you can see the following:
DEBUG S3NativeFileSystem: Adding block at 0 with length 10657 INFO FileInputFormat: Total input paths to process : 1 DEBUG FileInputFormat: Total # of splits generated by getSplits: 3553, TimeTaken: 1330
The large number of RDD partitions was caused by the large number of file input splits generated. You can use the mapreduce.input.fileinputformat.split.minsize
option to reduce the number of splits:
from pyspark.sql import SparkSession, HiveContext spark = SparkSession.builder.appName('Spark_ap').enableHiveSupport().getOrCreate() hiveContext = HiveContext(spark) hiveContext.setConf("mapreduce.input.fileinputformat.split.minsize", 50*1024*1024) df = spark.sql("select name from country_iso_codes") print("Number of partitions: " + str(df.rdd.getNumPartitions()))
Output:
Number of partitions: 1
Now only one RDD partition is created.
Instead of adding the mapreduce.input.fileinputformat.split.minsize
option to every Spark job, you can add it to /etc/spark/conf/hive-site.xml
configuration file.