Skip to content

Commit

Permalink
[ADS-13] Validate metric TaskMetrics.peakExecutionMemory for native S…
Browse files Browse the repository at this point in the history
…QL engine (oap-project#14)

Closes oap-project#13
  • Loading branch information
zhztheplayer committed Feb 20, 2021
1 parent c2d373c commit e09272f
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void acquire(long size) {
}
long granted = acquireMemory(size);
if (granted < size) {
freeMemory(granted);
throw new OutOfMemoryException("Not enough spark off-heap execution memory. " +
"Acquired: " + size + ", granted: " + granted + ". " +
"Try tweaking config option spark.memory.offHeap.size to " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.spark.sql.execution.datasources.v2.arrow;

import java.util.concurrent.atomic.AtomicLong;

public final class NativeSQLMemoryMetrics {
private final AtomicLong peak = new AtomicLong(0L);
private final AtomicLong total = new AtomicLong(0L);

public void inc(long bytes) {
final long total = this.total.addAndGet(bytes);
long prev_peak;
do {
prev_peak = this.peak.get();
if (total <= prev_peak) {
break;
}
} while (!this.peak.compareAndSet(prev_peak, total));
}

public long peak() {
return peak.get();
}

public long total() {
return total.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@
public class SparkManagedAllocationListener implements AllocationListener {

private final NativeSQLMemoryConsumer consumer;
private final NativeSQLMemoryMetrics metrics;

public SparkManagedAllocationListener(NativeSQLMemoryConsumer consumer) {
public SparkManagedAllocationListener(NativeSQLMemoryConsumer consumer, NativeSQLMemoryMetrics metrics) {
this.consumer = consumer;
this.metrics = metrics;
}

@Override
public void onPreAllocation(long size) {
consumer.acquire(size);
metrics.inc(size);
}

@Override
public void onRelease(long size) {
consumer.free(size);
metrics.inc(-size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,22 @@
public class SparkManagedReservationListener implements ReservationListener {

private final NativeSQLMemoryConsumer consumer;
private final NativeSQLMemoryMetrics metrics;

public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer) {
public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeSQLMemoryMetrics metrics) {
this.consumer = consumer;
this.metrics = metrics;
}

@Override
public void reserve(long size) {
consumer.acquire(size);
metrics.inc(size);
}

@Override
public void unreserve(long size) {
consumer.free(size);
metrics.inc(-size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.execution.datasources.v2.arrow

import java.util
import java.util.UUID

import scala.collection.JavaConverters._

import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{NativeSQLMemoryConsumer, SparkManagedAllocationListener, SparkManagedReservationListener, Spiller}
import java.util
import com.intel.oap.spark.sql.execution.datasources.v2.arrow._
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.memory.{BaseAllocator, BufferAllocator}

Expand All @@ -37,18 +37,22 @@ object SparkMemoryUtils {
throw new IllegalStateException("Creating TaskMemoryResources instance out of Spark task")
}

val sharedMetrics = new NativeSQLMemoryMetrics()

val defaultAllocator: BaseAllocator = {
val globalAlloc = globalAllocator()
val al = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP))
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
val parent = globalAlloc
parent.newChildAllocator("Spark Managed Allocator - " +
UUID.randomUUID().toString, al, 0, parent.getLimit).asInstanceOf[BaseAllocator]
}

val defaultMemoryPool: NativeMemoryPool = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP))
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
NativeMemoryPool.createListenable(rl)
}

Expand All @@ -60,15 +64,17 @@ object SparkMemoryUtils {

def createSpillableMemoryPool(spiller: Spiller): NativeMemoryPool = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller))
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val pool = NativeMemoryPool.createListenable(rl)
memoryPools.add(pool)
pool
}

def createSpillableAllocator(spiller: Spiller): BaseAllocator = {
val al = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller))
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val parent = globalAllocator()
val alloc = parent.newChildAllocator("Spark Managed Allocator - " +
UUID.randomUUID().toString, al, 0, parent.getLimit).asInstanceOf[BaseAllocator]
Expand Down Expand Up @@ -163,6 +169,7 @@ object SparkMemoryUtils {
taskToResourcesMap.synchronized {
val resources = taskToResourcesMap.remove(context)
resources.release()
context.taskMetrics().incPeakExecutionMemory(resources.sharedMetrics.peak())
}
}
})
Expand Down

0 comments on commit e09272f

Please sign in to comment.