Skip to content

Conversation

@zhztheplayer
Copy link
Member

@zhztheplayer zhztheplayer commented Oct 31, 2025

What changes were proposed in this pull request?

The PR adds off-heap memory mode support for LongHashedRelation.

The PR only affects ShuffledHashJoin. In BroadcastHashJoin, the hashed relations are not closed explicitly but are managed by GC. So it will require a different approach to allocate from off-heap.

Why are the changes needed?

  1. To avoid on-heap OOMs when user sets spark.memory.offHeap.enabled=true, and configures JVM with a comparatively small heap size.
  2. Off-heap mode is seen faster than on-heap mode. See benchmark results.

Does this PR introduce any user-facing change?

By design, When spark.memory.offHeap.enabled=true is set:

  • Required off-heap memory size may increase
  • Required on-heap memory size may decrease

How was this patch tested?

  1. Functionality is covered by HashedRelationOnHeapSuite / HashedRelationOffHeapSuite.
  2. Memory leak is guarded by the change in [SPARK-54132][SQL][TESTS] Cover HashedRelation#close in HashedRelationSuite #52830.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Oct 31, 2025
cloud-fan pushed a commit that referenced this pull request Nov 3, 2025
…nSuite

### What changes were proposed in this pull request?

Add the following code in `HashedRelationSuite`, to cover the API `HashedRelation#close` in the test suite.

```scala
  protected override def afterEach(): Unit = {
    super.afterEach()
    assert(umm.executionMemoryUsed === 0)
  }
```

### Why are the changes needed?

Doing this will:

1. Ensure `HashedRelation#close` is called in test code, to lower memory footprint and avoid memory leak when executing tests.
2. Ensure implementations of `HashedRelation#close` free the allocated memory blocks correctly.

It's an individual effort to improve the test quality, but also a prerequisite task for #52817.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It's a test PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52830 from zhztheplayer/wip-54132.

Authored-by: Hongze Zhang <hongze.zzz123@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Nov 3, 2025
…nSuite

### What changes were proposed in this pull request?

Add the following code in `HashedRelationSuite`, to cover the API `HashedRelation#close` in the test suite.

```scala
  protected override def afterEach(): Unit = {
    super.afterEach()
    assert(umm.executionMemoryUsed === 0)
  }
```

### Why are the changes needed?

Doing this will:

1. Ensure `HashedRelation#close` is called in test code, to lower memory footprint and avoid memory leak when executing tests.
2. Ensure implementations of `HashedRelation#close` free the allocated memory blocks correctly.

It's an individual effort to improve the test quality, but also a prerequisite task for #52817.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It's a test PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52830 from zhztheplayer/wip-54132.

Authored-by: Hongze Zhang <hongze.zzz123@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit a5e866f)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@zhztheplayer zhztheplayer force-pushed the wip-54116-off-heap-long-relation branch 2 times, most recently from 3686285 to 842d2de Compare November 10, 2025 17:00
@zhztheplayer zhztheplayer marked this pull request as ready for review November 10, 2025 17:30
@zhztheplayer
Copy link
Member Author

@cloud-fan @yaooqinn @dongjoon-hyun @@HyukjinKwon @viirya

Would you kindly help review this PR? It's for making Spark SQL work more smoothly with 3rd party off-heap based operators / expressions. Thanks!

@HyukjinKwon
Copy link
Member

I wonder if the benchmark can be done at least. I worked on similar changes to implement off heap stuff, and realised that it isn't necessarily fast.

@HyukjinKwon
Copy link
Member

e.g., sometimes it appears JIT to be quite smarter than using direct off heap memory

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 10, 2025

@HyukjinKwon Thank you for the quick response.

I benchmarked using the existing HashedRelationMetricsBenchmark.

500K Rows

Before:

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
18:40:31.460 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                   55             63           4          9.1         109.8       1.0X

After (on-heap):

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
18:39:53.863 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                   63            105          38          8.0         125.5       1.0X

Afer (off-heap):

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
15:13:48.380 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                   62             68           4          8.1         123.0       1.0X

10M Rows

Before:

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
18:53:43.292 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                 2955           3121         235          3.4         295.5       1.0X

After (on-heap):

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
18:54:10.447 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                 3048           3336         408          3.3         304.8       1.0X

After (off-heap):

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu124.04.1 on Linux 6.14.0-33-generic
15:11:40.943 ERROR org.apache.spark.util.Utils: Process List(/usr/bin/grep, -m, 1, model name, /proc/cpuinfo) exited with code 1: 

Unknown processor
LongToUnsafeRowMap metrics:               Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
LongToUnsafeRowMap                                 2453           2459           9          4.1         245.3       1.0X

e.g., sometimes it appears JIT to be quite smarter than using direct off heap memory

Yes, and also regarding the benchmark results, I do think the new approach came across a bit slower (although they are close). Do you have any suggestions?

@zhztheplayer
Copy link
Member Author

e.g., sometimes it appears JIT to be quite smarter than using direct off heap memory

I actually didn't expect the off-heap relation can be faster - it's more a work to make sure Spark can work with a relatively smaller heap under off-heap memory mode, so we can prevent heap OOMs.

@zhztheplayer zhztheplayer changed the title [SPARK-54116][SQL] Add off-heap mode support for LongHashedRelation [SPARK-54116][SQL] Add off-heap mode support for LongHashedRelation to avoid JVM heap OOMs under off-heap memory mode Nov 10, 2025
val got = acquireMemory(size)
if (got < size) {
freeMemory(got)
throw QueryExecutionErrors.cannotAcquireMemoryToBuildLongHashedRelationError(size, got)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this error class

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 11, 2025

It's interesting that, after the change, the off-heap mode relation looks meaningfully faster than on-heap relation in the benchmark. I will update the benchmark result inline.

@zhztheplayer zhztheplayer changed the title [SPARK-54116][SQL] Add off-heap mode support for LongHashedRelation to avoid JVM heap OOMs under off-heap memory mode [SPARK-54116][SQL] Add off-heap mode support for LongHashedRelation Nov 11, 2025
@dongjoon-hyun
Copy link
Member

Just a note to @zhztheplayer . From my understanding, this feature needs enough time to do extensive tests over various workloads. Given that, I'd like to recommend to re-target this to Apache Spark 4.2.0 only. For 4.2.0, we can get more chances to test this in 4.2.0-preview1, 4.2.0-preview2, and so on. For Spark 4.1, it looks too late to me.

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 12, 2025

Just a note to @zhztheplayer . From my understanding, this feature needs enough time to do extensive tests over various workloads. Given that, I'd like to recommend to re-target this to Apache Spark 4.2.0 only. For 4.2.0, we can get more chances to test this in 4.2.0-preview1, 4.2.0-preview2, and so on. For Spark 4.1, it looks too late to me.

@dongjoon-hyun According to the developer document mentioned in the issue, I will leave the target version empty. Thank you for helping with planning the target version. 4.2 looks totally fine to me.

@zhztheplayer zhztheplayer force-pushed the wip-54116-off-heap-long-relation branch from ff3765c to b9b24d6 Compare November 12, 2025 13:18
@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 17, 2025

The PR only affects ShuffledHashJoin. In BroadcastHashJoin, the hashed relations are not closed explicitly but are managed by GC. So it will require a different approach to allocate from off-heap.

How is this implemented in this PR? I don't see any branching code regarding different joins.

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 17, 2025

Hi @cloud-fan, thanks for having a look.

How is this implemented in this PR? I don't see any branching code regarding different joins.

This is a bit subtle due to how the task memory manager is got for creating a hashed relation in Spark code.

For SHJ

val relation = HashedRelation(
iter,
buildBoundKeys,
taskMemoryManager = context.taskMemoryManager(),
// build-side or full outer join needs support for NULL key in HashedRelation.
allowsNullKey = joinType == FullOuter ||
(joinType == LeftOuter && buildSide == BuildLeft) ||
(joinType == RightOuter && buildSide == BuildRight),
ignoresDuplicatedKey = ignoreDuplicatedKey)

As seen, context.taskMemoryManager() is passed in, so SHJ is supposed to follow the Spark option spark.memory.offHeap.enabled.

For BHJ (Driver)

The code goes through this path:

then

override def transform(
rows: Iterator[InternalRow],
sizeHint: Option[Long]): HashedRelation = {
sizeHint match {
case Some(numRows) =>
HashedRelation(rows, key, numRows.toInt, isNullAware = isNullAware)
case None =>
HashedRelation(rows, key, isNullAware = isNullAware)
}
}

, where no tmm is passed for creating the hashed relation. In this case, a temporary on-heap tmm will be created and used:

val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
}
.

For BHJ (Executor)

Similar to the driver side, the deserialization code also uses a temporary on-heap tmm:

val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
.

@zhztheplayer
Copy link
Member Author

zhztheplayer commented Nov 17, 2025

The CI failure (hanged forever after changing from Array[Long] to MemoryBlock for on-heap allocation) is being addressed by another PR: #53065, which this one depends on.

@zhztheplayer zhztheplayer force-pushed the wip-54116-off-heap-long-relation branch from c8d965c to e60073a Compare November 21, 2025 16:32
numKeys = 0
ensureAcquireMemory(n * 2 * 8L)
array = new Array[Long](n * 2)
array = new UnsafeLongArray(n * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does UnsafeLongArray allocate memory from the memory manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it calls the memory manager in its constructor.

read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
}

private class UnsafeLongArray(val length: Int) {
Copy link
Contributor

@cloud-fan cloud-fan Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is already a LongArray class in Spark, can we reuse it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question. LongArray supports both on-heap and off-heap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks for the suggestion.

Copy link
Member Author

@zhztheplayer zhztheplayer Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a few more code changes using LongArray - but yes the functinality is the same.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 7c0b392 Nov 25, 2025
@zhztheplayer
Copy link
Member Author

Thanks! @cloud-fan @yaooqinn @dongjoon-hyun @HyukjinKwon @viirya

huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…nSuite

### What changes were proposed in this pull request?

Add the following code in `HashedRelationSuite`, to cover the API `HashedRelation#close` in the test suite.

```scala
  protected override def afterEach(): Unit = {
    super.afterEach()
    assert(umm.executionMemoryUsed === 0)
  }
```

### Why are the changes needed?

Doing this will:

1. Ensure `HashedRelation#close` is called in test code, to lower memory footprint and avoid memory leak when executing tests.
2. Ensure implementations of `HashedRelation#close` free the allocated memory blocks correctly.

It's an individual effort to improve the test quality, but also a prerequisite task for apache#52817.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

It's a test PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52830 from zhztheplayer/wip-54132.

Authored-by: Hongze Zhang <hongze.zzz123@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

The PR adds off-heap memory mode support for LongHashedRelation.

The PR only affects ShuffledHashJoin. In BroadcastHashJoin, the hashed relations are not closed explicitly but are managed by GC. So it will require a different approach to allocate from off-heap.

### Why are the changes needed?

1. To avoid on-heap OOMs when user sets `spark.memory.offHeap.enabled=true`, and configures JVM with a comparatively small heap size.
2. Off-heap mode is seen faster than on-heap mode. See [benchmark results](apache#52817 (comment)).

### Does this PR introduce _any_ user-facing change?

By design, When `spark.memory.offHeap.enabled=true` is set:

- Required off-heap memory size may increase
- Required on-heap memory size may decrease

### How was this patch tested?

WIP

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52817 from zhztheplayer/wip-54116-off-heap-long-relation.

Authored-by: Hongze Zhang <hongze.zzz123@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants