Some Notes on Apache Spark Memory Management

Jason Thai

Important configurations:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances – Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user
  • spark.sql.execution.arrow.enabled - Enable optimization for panda dataframe
  • spark.files.ignoreCorruptFiles - Ignore corrupt files
  • spark.sql.files.ignoreCorruptFiles - Ignore corrupt files
  • spark.executor.extraJavaOptions - Other Java options like garbage collection for executors
  • spark.driver.extraJavaOptions - Other Java options like garbage collection for drivers

Sample Calculations:

Consider an EMR cluster with 1 master - 25 slaves running c5.18xlarge instance. Each instance comes with 72vCPU, 144 GiB Memory.

  • spark.executor.cores = number of virtual cores per executor. Recommendation is 5

    spark.excutor.cores = 5

  • Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

    Number of executors per instance = (72 - 1) / 5 = 14

    Total executor memory = total RAM per instance / number of executors per instance

    Total executor memory = 144 / 14 = 10 (round down)

    spark.executors.memory = total executor memory * 0.9

    spark.executors.memory = 10 * 0.9 = 9g

  • spark.executor.memoryOverhead = total executor memory * 0.10

    spark.excutor.memoryOverhead = 10 * 0.1 = 1g

  • spark.driver.memory = spark.executors.memory

    spark.driver.memory = 9g

  • spark.driver.cores= spark.executors.cores

    spark.driver.cores = 5

  • spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

    spark.executor.instances = 14 * 25 - 1 = 349

  • spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

    spark.default.parallelism = 349 * 5 * 2 = 3490

  • spark.executor.extraJavaOptions

    spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=’kill -9 %p’

  • spark.driver.extraJavaOptions

    spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError=’kill -9 %p’

Sources: