Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Sep 1, 2021
1 parent 59288a4 commit 5a1e4f3
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ object SparkMemoryUtils extends Logging {
LegacyBufferLedger.FACTORY
}

val sparkManagedAllocationListener = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
val directAllocationListener = DirectAllocationListener.INSTANCE

val allocListener: AllocationListener = if (isArrowAutoReleaseEnabled) {
new AllocationListenerList(sparkManagedAllocationListener, directAllocationListener)
} else {
sparkManagedAllocationListener
}

private def collectStackForDebug = {
if (DEBUG) {
Expand All @@ -112,13 +122,10 @@ object SparkMemoryUtils extends Logging {
private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]()

val defaultAllocator: BufferAllocator = {
val al = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
val alloc = new RootAllocator(ImmutableConfig.builder()
.maxAllocation(Long.MaxValue)
.bufferLedgerFactory(ledgerFactory)
.listener(new AllocationListenerList(al, DirectAllocationListener.INSTANCE))
.listener(allocListener)
.build)
allocators.add(alloc)
alloc
Expand Down

0 comments on commit 5a1e4f3

Please sign in to comment.