Skip to content

Commit 8e1034a

Browse files
zhztheplayerdongjoon-hyun
authored andcommitted
[SPARK-54354][SQL] Fix Spark hanging when there's not enough JVM heap memory for broadcast hashed relation
### What changes were proposed in this pull request? A fix to let Spark throw OOM rather than hang when there's not enough JVM heap memory for broadcast hashed relation. The fix is done by passing the current JVM's heap size rather than `Long.MaxValue / 2` to create the temporary `UnifiedMemoryManager` for broadcasting. This is an optimal setting because if the size we passed is too large, i.e., the current `Long.MaxValue / 2`, it will cause hanging; if the size is smaller than the current JVM heap size, the OOM might be thrown too early even when there's room in memory for the newly created hashed relation. Before: ```scala new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue / 2, 1) ``` After: ```scala new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Runtime.getRuntime.maxMemory, Runtime.getRuntime.maxMemory / 2, 1) ``` ### Why are the changes needed? Report the error fast instead of hanging. ### Does this PR introduce _any_ user-facing change? In some scenarios where large unsafe hashed relations are allocated for broadcast hash join, user will see a meaningful OOM instead of hanging. Before (hangs): ``` 15:07:38.456 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. 15:07:38.501 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. 15:07:38.539 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. 15:07:38.580 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. 15:07:38.613 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. 15:07:38.647 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again. ... ``` After (OOM): ``` An exception or error caused a run to abort: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200 org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200 at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:456) at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala) at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:868) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:202) at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:209) at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:464) at org.apache.spark.sql.execution.joins.HashedRelationSuite.$anonfun$new$90(HashedRelationSuite.scala:760) ``` ### How was this patch tested? Using the following code to do a manual test since we don't want to add a test case that captures an OOM error: ```scala // The PR's practice to use `Runtime.getRuntime.maxMemory` as the maximum size. val umm = new UnifiedMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Runtime.getRuntime.maxMemory, Runtime.getRuntime.maxMemory / 2, 1) val mm = new TaskMemoryManager(umm, 0) val relations = mutable.ArrayBuffer[HashedRelation]() // We should finally see an OOM thrown since we are keeping allocating hashed relations. assertThrows[SparkOutOfMemoryError] { while (true) { // Allocates ~128 MiB each time. relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm) } } // Releases the allocated memory. relations.foreach(_.close()) mm.cleanUpAllAllocatedMemory ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53065 from zhztheplayer/wip-54353-mm-hang. Authored-by: Hongze Zhang <hongze.zzz123@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit ac69d93) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent f2f550d commit 8e1034a

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ private[execution] object HashedRelation {
143143
new TaskMemoryManager(
144144
new UnifiedMemoryManager(
145145
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
146-
Long.MaxValue,
147-
Long.MaxValue / 2,
146+
Runtime.getRuntime.maxMemory,
147+
Runtime.getRuntime.maxMemory / 2,
148148
1),
149149
0)
150150
}
@@ -401,8 +401,8 @@ private[joins] class UnsafeHashedRelation(
401401
val taskMemoryManager = new TaskMemoryManager(
402402
new UnifiedMemoryManager(
403403
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
404-
Long.MaxValue,
405-
Long.MaxValue / 2,
404+
Runtime.getRuntime.maxMemory,
405+
Runtime.getRuntime.maxMemory / 2,
406406
1),
407407
0)
408408

@@ -576,8 +576,8 @@ private[execution] final class LongToUnsafeRowMap(
576576
new TaskMemoryManager(
577577
new UnifiedMemoryManager(
578578
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
579-
Long.MaxValue,
580-
Long.MaxValue / 2,
579+
Runtime.getRuntime.maxMemory,
580+
Runtime.getRuntime.maxMemory / 2,
581581
1),
582582
0),
583583
0)

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ import org.apache.spark.util.collection.CompactBuffer
4242
class HashedRelationSuite extends SharedSparkSession {
4343
val umm = new UnifiedMemoryManager(
4444
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
45-
Long.MaxValue,
46-
Long.MaxValue / 2,
45+
Runtime.getRuntime.maxMemory,
46+
Runtime.getRuntime.maxMemory / 2,
4747
1)
4848

4949
val mm = new TaskMemoryManager(umm, 0)

0 commit comments

Comments
 (0)