Skip to content

Commit

Permalink
Memory pool fix for Arrow 7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Apr 8, 2022
1 parent c5eddea commit 1543aff
Showing 1 changed file with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,24 @@ object SparkMemoryUtils extends Logging {
.newChildAllocator("CHILD-ALLOC-BUFFER-IMPORT", allocListenerForBufferImport, 0L,
Long.MaxValue)

val defaultMemoryPool: NativeMemoryPoolWrapper = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
val pool = NativeMemoryPoolWrapper(NativeMemoryPool.createListenable(rl), rl,
collectStackForDebug)
memoryPools.add(pool)
pool
}
// val defaultMemoryPool: NativeMemoryPoolWrapper = {
// val rl = new SparkManagedReservationListener(
// new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
// sharedMetrics)
// val pool = NativeMemoryPoolWrapper(NativeMemoryPool.createListenable(rl), rl,
// collectStackForDebug)
// memoryPools.add(pool)
// pool
// }

def createSpillableMemoryPool(spiller: Spiller): NativeMemoryPool = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val pool = NativeMemoryPool.createListenable(rl)
memoryPools.add(NativeMemoryPoolWrapper(pool, rl, collectStackForDebug))
pool
return NativeMemoryPool.getDefault
// val rl = new SparkManagedReservationListener(
// new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
// sharedMetrics)
// val pool = NativeMemoryPool.createListenable(rl)
// memoryPools.add(NativeMemoryPoolWrapper(pool, rl, collectStackForDebug))
// pool
}

def createSpillableAllocator(spiller: Spiller): BufferAllocator = {
Expand Down Expand Up @@ -301,10 +302,11 @@ object SparkMemoryUtils extends Logging {
}

def contextMemoryPool(): NativeMemoryPool = {
if (!inSparkTask()) {
return globalMemoryPool()
}
getTaskMemoryResources().defaultMemoryPool.pool
// if (!inSparkTask()) {
// return globalMemoryPool()
// }
// getTaskMemoryResources().defaultMemoryPool.pool
}

def getLeakedAllocators(): List[BufferAllocator] = {
Expand Down

0 comments on commit 1543aff

Please sign in to comment.