Important configurations:
- spark.executor.memory – Size of memory to use for each executor that runs the task.
- spark.executor.cores – Number of virtual cores.
- spark.driver.memory – Size of memory to use for the driver.
- spark.driver.cores – Number of virtual cores to use for the driver.
- spark.executor.instances – Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
- spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user
- spark.sql.execution.arrow.enabled - Enable optimization for panda dataframe
- spark.files.ignoreCorruptFiles - Ignore corrupt files
- spark.sql.files.ignoreCorruptFiles - Ignore corrupt files
- spark.executor.extraJavaOptions - Other Java options like garbage collection for executors
- spark.driver.extraJavaOptions - Other Java options like garbage collection for drivers
Sample Calculations:
Consider an EMR cluster with 1 master - 25 slaves running c5.18xlarge instance. Each instance comes with 72vCPU, 144 GiB Memory.
- spark.executor.cores = number of virtual cores per executor. Recommendation is 5
spark.excutor.cores = 5
- Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores
Number of executors per instance = (72 - 1) / 5 = 14
Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 144 / 14 = 10 (round down)
spark.executors.memory = total executor memory * 0.9
spark.executors.memory = 10 * 0.9 = 9g
- spark.executor.memoryOverhead = total executor memory * 0.10
spark.excutor.memoryOverhead = 10 * 0.1 = 1g
- spark.driver.memory = spark.executors.memory
spark.driver.memory = 9g
- spark.driver.cores= spark.executors.cores
spark.driver.cores = 5
- spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver
spark.executor.instances = 14 * 25 - 1 = 349
- spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2
spark.default.parallelism = 349 * 5 * 2 = 3490
- spark.executor.extraJavaOptions
spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=’kill -9 %p’
- spark.driver.extraJavaOptions
spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=’kill -9 %p’
Sources:
- https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
- https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html