diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index c08b47f99dda3..596974f338fd8 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -17,8 +17,11 @@ package org.apache.spark.memory +import java.lang.management.{ManagementFactory, PlatformManagedObject} import javax.annotation.concurrent.GuardedBy +import scala.util.Try + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -27,6 +30,7 @@ import org.apache.spark.storage.memory.MemoryStore import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.memory.MemoryAllocator +import org.apache.spark.util.Utils /** * An abstract memory manager that enforces how memory is shared between execution and storage. @@ -242,8 +246,12 @@ private[spark] abstract class MemoryManager( * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value * by looking at the number of cores available to the process, and the total amount of memory, * and then divide it by a factor of safety. + * + * SPARK-37593 If we are using G1GC, it's better to take the LONG_ARRAY_OFFSET + * into consideration so that the requested memory size is power of 2 + * and can be divided by G1 heap region size to reduce memory waste within one G1 region. */ - val pageSizeBytes: Long = { + private lazy val defaultPageSizeBytes = { val minPageSize = 1L * 1024 * 1024 // 1MB val maxPageSize = 64L * minPageSize // 64MB val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors() @@ -254,10 +262,16 @@ private[spark] abstract class MemoryManager( case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize } val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor) - val default = math.min(maxPageSize, math.max(minPageSize, size)) - conf.get(BUFFER_PAGESIZE).getOrElse(default) + val chosenPageSize = math.min(maxPageSize, math.max(minPageSize, size)) + if (isG1GC && tungstenMemoryMode == MemoryMode.ON_HEAP) { + chosenPageSize - Platform.LONG_ARRAY_OFFSET + } else { + chosenPageSize + } } + val pageSizeBytes: Long = conf.get(BUFFER_PAGESIZE).getOrElse(defaultPageSizeBytes) + /** * Allocates memory for use by Unsafe/Tungsten code. */ @@ -267,4 +281,22 @@ private[spark] abstract class MemoryManager( case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE } } + + /** + * Return whether we are using G1GC or not + */ + private lazy val isG1GC: Boolean = { + Try { + val clazz = Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean") + .asInstanceOf[Class[_ <: PlatformManagedObject]] + val vmOptionClazz = Utils.classForName("com.sun.management.VMOption") + val hotSpotDiagnosticMXBean = ManagementFactory.getPlatformMXBean(clazz) + val vmOptionMethod = clazz.getMethod("getVMOption", classOf[String]) + val valueMethod = vmOptionClazz.getMethod("getValue") + + val useG1GCObject = vmOptionMethod.invoke(hotSpotDiagnosticMXBean, "UseG1GC") + val useG1GC = valueMethod.invoke(useG1GCObject).asInstanceOf[String] + "true".equals(useG1GC) + }.getOrElse(false) + } }