Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b7c9c23
Move Unsafe mem. mgrs. to spark-core subproject.
JoshRosen Oct 14, 2015
25ba4b5
Merge ExecutorMemoryManager into MemoryManager.
JoshRosen Oct 14, 2015
3d997ce
Naming and formatting fixes.
JoshRosen Oct 16, 2015
d9e6b84
Move Tungsten-related methods to end of MemoryManager file.
JoshRosen Oct 16, 2015
98ef86b
Add taskAttemptId to TaskMemoryManager constructor.
JoshRosen Oct 16, 2015
8f93e94
Move ShuffleMemoryManager into memory package.
JoshRosen Oct 16, 2015
3bbc54d
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 16, 2015
88a7970
Fix bug in AbstractBytesToBytesMapSuite.
JoshRosen Oct 16, 2015
ec48ff9
Refactor the existing Tungsten TaskMemoryManager interactions so Tung…
JoshRosen Oct 16, 2015
6f98bc4
Move TaskMemoryManager from unsafe to memory.
JoshRosen Oct 16, 2015
6459397
Further minimization of ShuffleMemoryManager usage.
JoshRosen Oct 16, 2015
60c66b2
Merge ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 17, 2015
7d6a37f
Clean up interaction between TaskMemoryManager and MemoryManager.
JoshRosen Oct 17, 2015
0dc21dc
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 22, 2015
f21b767
Fix compilation.
JoshRosen Oct 22, 2015
46ad693
Fix Scalastyle
JoshRosen Oct 22, 2015
c33e330
Fix import ordering in Executor.scala
JoshRosen Oct 22, 2015
ef45d91
Fix import ordering in Task.scala
JoshRosen Oct 22, 2015
c7eac69
Fix import ordering in TaskContextImpl
JoshRosen Oct 22, 2015
d86f435
Fix spillable collection tests
JoshRosen Oct 22, 2015
bba5550
Integrate TaskMemoryManager acquire/releasePage with MemoryManager bo…
JoshRosen Oct 22, 2015
66ae259
Move pooling logic into allocators themselves.
JoshRosen Oct 22, 2015
b1d5151
Scaladoc updates.
JoshRosen Oct 22, 2015
d0c0dd9
Update Spillable to properly integrate with TaskMemoryManager.
JoshRosen Oct 22, 2015
48149fc
Move pageSizeBytes to Tungsten section
JoshRosen Oct 23, 2015
c8ba196
Cleanup after merging of ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 23, 2015
63a6cbc
Rename getMemoryConsumptionForThisTask to getExecutionMemoryUsageForTask
JoshRosen Oct 23, 2015
6ec9c30
Properly thread numCores to memory manager.
JoshRosen Oct 23, 2015
1593fad
Explain why MemoryBlock.pageNumber is public
JoshRosen Oct 23, 2015
64bec0b
Fix TaskMemoryManagerSuite tests.
JoshRosen Oct 23, 2015
f9240e9
Fix compilation
JoshRosen Oct 23, 2015
a95bc08
Fix a memory leak in UnsafeShuffleWriter's sorter
JoshRosen Oct 23, 2015
b3ad761
Remove println
JoshRosen Oct 23, 2015
a7e8320
Fix Scalastyle.
JoshRosen Oct 23, 2015
e874a45
Fix remaining TODOs in UnsafeShuffleWriterSuite.
JoshRosen Oct 23, 2015
2ba6e51
Fix DeveloperAPI change
JoshRosen Oct 23, 2015
0c13723
Address comments in MemoryManager
JoshRosen Oct 23, 2015
04ec429
Release memory acquired after unsuccessful allocatePage() call
JoshRosen Oct 23, 2015
e56d039
Fix EAOM compilation.
JoshRosen Oct 23, 2015
aa14113
Port tests from ShuffleMemoryManagerSuite
JoshRosen Oct 23, 2015
7addf8b
Remove unused non-page-memory allocation methods.
JoshRosen Oct 23, 2015
5af0b17
Update Tungsten tests
JoshRosen Oct 23, 2015
a264703
Fix execution memory leaks in Spillable collections
JoshRosen Oct 24, 2015
f2ab708
Fix NPE in UnsafeRowSerializerSuite
JoshRosen Oct 24, 2015
0b5c72f
Update EAOM tests to reflect fact that iterator() is destructive.
JoshRosen Oct 24, 2015
f68fdb1
Fix streaming test compilation
JoshRosen Oct 26, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;
package org.apache.spark.memory;

import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* Manages the memory allocated by an individual task.
* <p>
Expand Down Expand Up @@ -87,13 +89,9 @@ public class TaskMemoryManager {
*/
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);

/**
* Tracks memory allocated with {@link TaskMemoryManager#allocate(long)}, used to detect / clean
* up leaked memory.
*/
private final HashSet<MemoryBlock> allocatedNonPageMemory = new HashSet<MemoryBlock>();
private final MemoryManager memoryManager;

private final ExecutorMemoryManager executorMemoryManager;
private final long taskAttemptId;

/**
* Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
Expand All @@ -103,16 +101,38 @@ public class TaskMemoryManager {
private final boolean inHeap;

/**
* Construct a new MemoryManager.
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(ExecutorMemoryManager executorMemoryManager) {
this.inHeap = executorMemoryManager.inHeap;
this.executorMemoryManager = executorMemoryManager;
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
}

/**
* Acquire N bytes of memory for execution, evicting cached blocks if necessary.
* @return number of bytes successfully granted (<= N).
*/
public long acquireExecutionMemory(long size) {
return memoryManager.acquireExecutionMemory(size, taskAttemptId);
}

/**
* Release N bytes of execution memory.
*/
public void releaseExecutionMemory(long size) {
memoryManager.releaseExecutionMemory(size, taskAttemptId);
}

public long pageSizeBytes() {
return memoryManager.pageSizeBytes();
}

/**
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of memory that will be shared between operators.
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
* Returns `null` if there was not enough memory to allocate the page.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit @return

*/
public MemoryBlock allocatePage(long size) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
Expand All @@ -129,7 +149,15 @@ public MemoryBlock allocatePage(long size) {
}
allocatedPages.set(pageNumber);
}
final MemoryBlock page = executorMemoryManager.allocate(size);
final long acquiredExecutionMemory = acquireExecutionMemory(size);
if (acquiredExecutionMemory != size) {
releaseExecutionMemory(acquiredExecutionMemory);
synchronized (this) {
allocatedPages.clear(pageNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to release the memory we acquired here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, good catch. Yes.

}
return null;
}
final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size);
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
Expand All @@ -152,45 +180,16 @@ public void freePage(MemoryBlock page) {
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
// Cannot access a page once it's freed.
executorMemoryManager.free(page);
}

/**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `zero()` on the result if this is necessary). This method is intended
* to be used for allocating operators' internal data structures. For data pages that you want to
* exchange between operators, consider using {@link TaskMemoryManager#allocatePage(long)}, since
* that will enable intra-memory pointers (see
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} and this class's
* top-level Javadoc for more details).
*/
public MemoryBlock allocate(long size) throws OutOfMemoryError {
assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = executorMemoryManager.allocate(size);
synchronized(allocatedNonPageMemory) {
allocatedNonPageMemory.add(memory);
}
return memory;
}

/**
* Free memory allocated by {@link TaskMemoryManager#allocate(long)}.
*/
public void free(MemoryBlock memory) {
assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
executorMemoryManager.free(memory);
synchronized(allocatedNonPageMemory) {
final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize);
}

/**
* Given a memory page and offset within that page, encode this address into a 64-bit long.
* This address will remain valid as long as the corresponding page has not been freed.
*
* @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}.
* @param page a data page allocated by {@link TaskMemoryManager#allocatePage(long)}/
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
* this should be the value that you would pass as the base offset into an
* UNSAFE call (e.g. page.baseOffset() + something).
Expand Down Expand Up @@ -270,17 +269,15 @@ public long cleanUpAllAllocatedMemory() {
}
}

synchronized (allocatedNonPageMemory) {
final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
while (iter.hasNext()) {
final MemoryBlock memory = iter.next();
freedBytes += memory.size();
// We don't call free() here because that calls Set.remove, which would lead to a
// ConcurrentModificationException here.
executorMemoryManager.free(memory);
iter.remove();
}
}
freedBytes += memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);

return freedBytes;
}

/**
* Returns the memory consumption, in bytes, for the current task
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.sort;

import org.apache.spark.memory.TaskMemoryManager;

/**
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
* <p>
Expand All @@ -26,7 +28,7 @@
* </pre>
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
* 13-bit page numbers assigned by {@link TaskMemoryManager}), this
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
* <p>
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

/**
Expand Down Expand Up @@ -72,7 +71,6 @@ final class ShuffleExternalSorter {
@VisibleForTesting
final int maxRecordSizeBytes;
private final TaskMemoryManager taskMemoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
private final ShuffleWriteMetrics writeMetrics;
Expand Down Expand Up @@ -105,15 +103,13 @@ final class ShuffleExternalSorter {

public ShuffleExternalSorter(
TaskMemoryManager memoryManager,
ShuffleMemoryManager shuffleMemoryManager,
BlockManager blockManager,
TaskContext taskContext,
int initialSize,
int numPartitions,
SparkConf conf,
ShuffleWriteMetrics writeMetrics) throws IOException {
this.taskMemoryManager = memoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
this.initialSize = initialSize;
Expand All @@ -124,7 +120,7 @@ public ShuffleExternalSorter(
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes());
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
Expand All @@ -140,9 +136,9 @@ public ShuffleExternalSorter(
private void initializeForWriting() throws IOException {
// TODO: move this sizing calculation logic into a static method of sorter:
final long memoryRequested = initialSize * 8L;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryRequested);
if (memoryAcquired != memoryRequested) {
shuffleMemoryManager.release(memoryAcquired);
taskMemoryManager.releaseExecutionMemory(memoryAcquired);
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}

Expand Down Expand Up @@ -272,6 +268,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
*/
@VisibleForTesting
void spill() throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
Expand All @@ -281,7 +278,7 @@ void spill() throws IOException {
writeSortedFile(false);
final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
shuffleMemoryManager.release(inMemSorterMemoryUsage);
taskMemoryManager.releaseExecutionMemory(inMemSorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

Expand Down Expand Up @@ -316,9 +313,13 @@ private long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
taskMemoryManager.freePage(block);
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
if (inMemSorter != null) {
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we release this now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This memory is now released as part of taskMemoryManager.freePage().

inMemSorter = null;
taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
}
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
Expand All @@ -337,8 +338,9 @@ public void cleanupResources() {
}
}
if (inMemSorter != null) {
shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
inMemSorter = null;
taskMemoryManager.releaseExecutionMemory(sorterMemoryUsage);
}
}

Expand All @@ -353,21 +355,20 @@ private void growPointerArrayIfNecessary() throws IOException {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
final long memoryAcquired = taskMemoryManager.acquireExecutionMemory(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
taskMemoryManager.releaseExecutionMemory(memoryAcquired);
spill();
} else {
inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
taskMemoryManager.releaseExecutionMemory(oldPointerArrayMemoryUsage);
}
}
}

/**
* Allocates more memory in order to insert an additional record. This will request additional
* memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
* obtained.
* memory from the memory manager and spill if the requested memory can not be obtained.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
* the record size. This must be less than or equal to the page size (records
Expand All @@ -386,17 +387,14 @@ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
pageSizeBytes + ")");
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
if (currentPage == null) {
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
if (currentPage == null) {
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
Expand Down Expand Up @@ -430,17 +428,14 @@ public void insertRecord(
long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
// The record is larger than the page size, so allocate a special overflow page just to hold
// that record.
final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGranted != overflowPageSize) {
shuffleMemoryManager.release(memoryGranted);
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
if (overflowPage == null) {
spill();
final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
if (memoryGrantedAfterSpill != overflowPageSize) {
shuffleMemoryManager.release(memoryGrantedAfterSpill);
overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
if (overflowPage == null) {
throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
}
}
MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
allocatedPages.add(overflowPage);
dataPage = overflowPage;
dataPagePosition = overflowPage.getBaseOffset();
Expand Down
Loading