Skip to content
Closed
38 changes: 35 additions & 3 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.memory

import java.lang.management.{ManagementFactory, PlatformManagedObject}
import javax.annotation.concurrent.GuardedBy

import scala.util.Try

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand All @@ -27,6 +30,7 @@ import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
import org.apache.spark.util.Utils

/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
Expand Down Expand Up @@ -242,8 +246,12 @@ private[spark] abstract class MemoryManager(
* If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
* by looking at the number of cores available to the process, and the total amount of memory,
* and then divide it by a factor of safety.
*
* SPARK-37593 If we are using G1GC, it's better to take the LONG_ARRAY_OFFSET
* into consideration so that the requested memory size is power of 2
* and can be divided by G1 heap region size to reduce memory waste within one G1 region.
*/
val pageSizeBytes: Long = {
private lazy val defaultPageSizeBytes = {
val minPageSize = 1L * 1024 * 1024 // 1MB
val maxPageSize = 64L * minPageSize // 64MB
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
Expand All @@ -254,10 +262,16 @@ private[spark] abstract class MemoryManager(
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
}
val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.get(BUFFER_PAGESIZE).getOrElse(default)
val chosenPageSize = math.min(maxPageSize, math.max(minPageSize, size))
if (isG1GC && tungstenMemoryMode == MemoryMode.ON_HEAP) {
chosenPageSize - Platform.LONG_ARRAY_OFFSET
} else {
chosenPageSize
}
}

val pageSizeBytes: Long = conf.get(BUFFER_PAGESIZE).getOrElse(defaultPageSizeBytes)

Comment on lines +273 to +274
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So once users set the buffer size config, this optimization won't apply, right?

/**
* Allocates memory for use by Unsafe/Tungsten code.
*/
Expand All @@ -267,4 +281,22 @@ private[spark] abstract class MemoryManager(
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}

/**
* Return whether we are using G1GC or not
*/
private lazy val isG1GC: Boolean = {
Try {
val clazz = Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean")
.asInstanceOf[Class[_ <: PlatformManagedObject]]
val vmOptionClazz = Utils.classForName("com.sun.management.VMOption")
val hotSpotDiagnosticMXBean = ManagementFactory.getPlatformMXBean(clazz)
val vmOptionMethod = clazz.getMethod("getVMOption", classOf[String])
val valueMethod = vmOptionClazz.getMethod("getValue")

val useG1GCObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC")
val useG1GC = valueMethod.invoke(useG1GCObject).asInstanceOf[String]
"true".equals(useG1GC)
}.getOrElse(false)
}
}