Skip to content
46 changes: 33 additions & 13 deletions core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;

Expand Down Expand Up @@ -638,7 +639,11 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
assert (valueLength % 8 == 0);
assert(longArray != null);

if (numElements == MAX_CAPACITY || !canGrowArray) {

if (numElements == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numElements > growthThreshold) {
return false;
}

Expand Down Expand Up @@ -730,25 +735,18 @@ private void allocate(int capacity) {
}

/**
* Free the memory used by longArray.
* Free all allocated memory associated with this map, including the storage for keys and values
* as well as the hash map array itself.
*
* This method is idempotent and can be called multiple times.
*/
public void freeArray() {
public void free() {
updatePeakMemoryUsed();
if (longArray != null) {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
}
}

/**
* Free all allocated memory associated with this map, including the storage for keys and values
* as well as the hash map array itself.
*
* This method is idempotent and can be called multiple times.
*/
public void free() {
freeArray();
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
while (dataPagesIterator.hasNext()) {
MemoryBlock dataPage = dataPagesIterator.next();
Expand Down Expand Up @@ -833,6 +831,28 @@ public int getNumDataPages() {
return dataPages.size();
}

/**
* Returns the underline long[] of longArray.
*/
public long[] getArray() {
assert(longArray != null);
return (long[]) longArray.memoryBlock().getBaseObject();
}

/**
* Reset this map to initialized state.
*/
public void reset() {
numElements = 0;
Arrays.fill(getArray(), 0);
while (dataPages.size() > 0) {
MemoryBlock dataPage = dataPages.removeLast();
freePage(dataPage);
}
currentPage = null;
pageCursor = 0;
}

/**
* Grows the size of the hash table and re-hash everything.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@ public static UnsafeExternalSorter createWithExistingInMemorySorter(
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
UnsafeInMemorySorter inMemorySorter) {
return new UnsafeExternalSorter(taskMemoryManager, blockManager,
UnsafeInMemorySorter inMemorySorter) throws IOException {
UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
sorter.spill(Long.MAX_VALUE, sorter);
// The external sorter will be used to insert records, in-memory sorter is not needed.
sorter.inMemSorter = null;
return sorter;
}

public static UnsafeExternalSorter create(
Expand Down Expand Up @@ -124,7 +128,6 @@ private UnsafeExternalSorter(
acquireMemory(inMemSorter.getMemoryUsage());
} else {
this.inMemSorter = existingInMemorySorter;
// will acquire after free the map
}
this.peakMemoryUsedBytes = getMemoryUsage();

Expand Down Expand Up @@ -157,12 +160,9 @@ public void closeCurrentPage() {
*/
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
assert(inMemSorter != null);
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
} else {

}
return 0L; // this should throw exception
}
Expand Down Expand Up @@ -388,25 +388,38 @@ public void insertKVRecord(Object keyBase, long keyOffset, int keyLen,
inMemSorter.insertRecord(recordAddress, prefix);
}

/**
* Merges another UnsafeExternalSorters into this one, the other one will be emptied.
*
* @throws IOException
*/
public void merge(UnsafeExternalSorter other) throws IOException {
other.spill();
spillWriters.addAll(other.spillWriters);
// remove them from `spillWriters`, or the files will be deleted in `cleanupResources`.
other.spillWriters.clear();
other.cleanupResources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordinarily, this would end up deleting the spill files, but it doesn't because of the spillWriters.clear() call above. If you end up updating this patch, mind adding a one-line comment to explain this (since it's a subtle point)?

}

/**
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen Do you remember why we need to clear this? Once cleared, how to delete the spilled files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @JoshRosen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted with @JoshRosen offline, we should not clear spillWriters here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note, we had a quick discussion. Seems we should not call spillWriters.clear(). Otherwise those spilled files will not be deleted.

public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
assert(inMemSorter != null);
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
spillWriters.clear();
spillMerger.addSpillIfNotEmpty(readingIterator);

if (inMemSorter != null) {
readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
spillMerger.addSpillIfNotEmpty(readingIterator);
}
return spillMerger.getSortedIterator();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.util.Comparator;

import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.Sorter;
import org.apache.spark.memory.TaskMemoryManager;

/**
* Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
Expand Down Expand Up @@ -77,13 +77,20 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
*/
private int pos = 0;

public UnsafeInMemorySorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
final PrefixComparator prefixComparator,
int initialSize) {
this(memoryManager, recordComparator, prefixComparator, new long[initialSize * 2]);
}

public UnsafeInMemorySorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
final PrefixComparator prefixComparator,
int initialSize) {
assert (initialSize > 0);
this.array = new long[initialSize * 2];
long[] array) {
this.array = array;
this.memoryManager = memoryManager;
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,13 @@ public void printPerfMetrics() {

/**
* Sorts the map's records in place, spill them to disk, and returns an [[UnsafeKVExternalSorter]]
* that can be used to insert more records to do external sorting.
*
* The only memory that is allocated is the address/prefix array, 16 bytes per record.
*
* Note that this destroys the map, and as a result, the map cannot be used anymore after this.
* Note that the map will be reset for inserting new records, and the returned sorter can NOT be used
* to insert records.
*/
public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOException {
UnsafeKVExternalSorter sorter = new UnsafeKVExternalSorter(
return new UnsafeKVExternalSorter(
groupingKeySchema, aggregationBufferSchema,
SparkEnv.get().blockManager(), map.getPageSizeBytes(), map);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this UnsafeFixedWidthAggregationMap usable after this call?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we need to make it clear that map will not be freed at here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I see, we cannot directly insert records to the external sorter returned by destructAndCreateExternalSorter. We need to use merge.

return sorter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ public UnsafeKVExternalSorter(
/* initialSize */ 4096,
pageSizeBytes);
} else {
// The memory needed for UnsafeInMemorySorter should be less than longArray in map.
map.freeArray();
// The memory used by UnsafeInMemorySorter will be counted later (end of this block)
// During spilling, the array in map will not be used, so we can borrow that and use it
// as the underline array for in-memory sorter (it's always large enough).
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
taskMemoryManager, recordComparator, prefixComparator, map.getArray());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that multiple objects may change the same array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the array from map is not thread safe, only accessible from current thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Then, this UnsafeInMemorySorter is the only object that can hold the reference other than the original bytes to bytes map, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


// We cannot use the destructive iterator here because we are reusing the existing memory
// pages in BytesToBytesMap to hold records during sorting.
Expand Down Expand Up @@ -123,10 +122,9 @@ public UnsafeKVExternalSorter(
pageSizeBytes,
inMemSorter);

sorter.spill();
map.free();
// counting the memory used UnsafeInMemorySorter
taskMemoryManager.acquireExecutionMemory(inMemSorter.getMemoryUsage(), sorter);
// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
// anymore, so the underline array could be used by map again.
map.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need comments at here. Basically, we need to explain why we reset. Also, once sorter is created, inMemSorter will not be needed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

}
}

Expand All @@ -142,6 +140,15 @@ public void insertKV(UnsafeRow key, UnsafeRow value) throws IOException {
value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(), prefix);
}

/**
* Merges another UnsafeKVExternalSorter into `this`, the other one will be emptied.
*
* @throws IOException
*/
public void merge(UnsafeKVExternalSorter other) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment?

sorter.merge(other.sorter);
}

/**
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
Expand Down
Loading