Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.parquet.Log;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.Preconditions;

import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -47,6 +49,8 @@ public class MemoryManager {
private final long minMemoryAllocation;
private final Map<InternalParquetRecordWriter, Long> writerList = new
HashMap<InternalParquetRecordWriter, Long>();
private final Map<String, Runnable> callBacks = new HashMap<String, Runnable>();
private double scale = 1.0;

public MemoryManager(float ratio, long minAllocation) {
checkRatio(ratio);
Expand Down Expand Up @@ -100,7 +104,6 @@ synchronized void removeWriter(InternalParquetRecordWriter writer) {
*/
private void updateAllocation() {
long totalAllocations = 0;
double scale;
for (Long allocation : writerList.values()) {
totalAllocations += allocation;
}
Expand All @@ -112,6 +115,10 @@ private void updateAllocation() {
"Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" +
"Scaling row group sizes to %.2f%% for %d writers",
100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size()));
for (Runnable callBack : callBacks.values()) {
// we do not really want to start a new thread here.
callBack.run();
}
}

int maxColCount = 0;
Expand Down Expand Up @@ -155,4 +162,37 @@ Map<InternalParquetRecordWriter, Long> getWriterList() {
float getMemoryPoolRatio() {
return memoryPoolRatio;
}

/**
* Register callback and deduplicate it if any.
* @param callBackName the name of callback. It should be identical.
* @param callBack the callback passed in from upper layer, such as Hive.
*/
public void registerScaleCallBack(String callBackName, Runnable callBack) {
Preconditions.checkNotNull(callBackName, "callBackName");
Preconditions.checkNotNull(callBack, "callBack");

if (callBacks.containsKey(callBackName)) {
throw new IllegalArgumentException("The callBackName " + callBackName +
" is duplicated and has been registered already.");
} else {
callBacks.put(callBackName, callBack);
}
}

/**
* Get the registered callbacks.
* @return
*/
Map<String, Runnable> getScaleCallBacks() {
return Collections.unmodifiableMap(callBacks);
}

/**
* Get the internal scale value of MemoryManger
* @return
*/
double getScale() {
return scale;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
*/
private static MemoryManager memoryManager;

static MemoryManager getMemoryManager() {
public static MemoryManager getMemoryManager() {
return memoryManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class TestMemoryManager {
int rowGroupSize;
ParquetOutputFormat parquetOutputFormat;
CompressionCodecName codec;
int counter = 0;
boolean firstRegister = true;

@Before
public void setUp() {
Expand Down Expand Up @@ -87,12 +89,36 @@ public void testMemoryManager() throws Exception {
//Verify the memory pool
Assert.assertEquals("memory pool size is incorrect.", expectPoolSize,
parquetOutputFormat.getMemoryManager().getTotalMemoryPool());

//Verify Callback mechanism
Assert.assertEquals("counter calculated by callback is incorrect.", 1, counter);
Assert.assertEquals("CallBack is duplicated.", 1, parquetOutputFormat.getMemoryManager()
.getScaleCallBacks().size());
}

private RecordWriter createWriter(int index) throws Exception{
Path file = new Path("target/test/", "parquet" + index);
parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport());
return parquetOutputFormat.getRecordWriter(conf, file, codec);
RecordWriter writer = parquetOutputFormat.getRecordWriter(conf, file, codec);
try {
parquetOutputFormat.getMemoryManager().registerScaleCallBack("increment-test-counter",
new Runnable() {
@Override
public void run() {
counter++;
}
});
if (!firstRegister) {
Assert.fail("Duplicated registering callback should throw duplicates exception.");
}
firstRegister = false;
} catch (IllegalArgumentException e) {
if (firstRegister) {
Assert.fail("Registering the same callback first time should succeed.");
}
}

return writer;
}

private void verifyRowGroupSize(int expectRowGroupSize) {
Expand Down