diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java index ec70b87622..f997d08456 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/MemoryManager.java @@ -20,8 +20,10 @@ import org.apache.parquet.Log; import org.apache.parquet.ParquetRuntimeException; +import org.apache.parquet.Preconditions; import java.lang.management.ManagementFactory; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -47,6 +49,8 @@ public class MemoryManager { private final long minMemoryAllocation; private final Map writerList = new HashMap(); + private final Map callBacks = new HashMap(); + private double scale = 1.0; public MemoryManager(float ratio, long minAllocation) { checkRatio(ratio); @@ -100,7 +104,6 @@ synchronized void removeWriter(InternalParquetRecordWriter writer) { */ private void updateAllocation() { long totalAllocations = 0; - double scale; for (Long allocation : writerList.values()) { totalAllocations += allocation; } @@ -112,6 +115,10 @@ private void updateAllocation() { "Total allocation exceeds %.2f%% (%,d bytes) of heap memory\n" + "Scaling row group sizes to %.2f%% for %d writers", 100*memoryPoolRatio, totalMemoryPool, 100*scale, writerList.size())); + for (Runnable callBack : callBacks.values()) { + // we do not really want to start a new thread here. + callBack.run(); + } } int maxColCount = 0; @@ -155,4 +162,37 @@ Map getWriterList() { float getMemoryPoolRatio() { return memoryPoolRatio; } + + /** + * Register callback and deduplicate it if any. + * @param callBackName the name of callback. It should be identical. + * @param callBack the callback passed in from upper layer, such as Hive. + */ + public void registerScaleCallBack(String callBackName, Runnable callBack) { + Preconditions.checkNotNull(callBackName, "callBackName"); + Preconditions.checkNotNull(callBack, "callBack"); + + if (callBacks.containsKey(callBackName)) { + throw new IllegalArgumentException("The callBackName " + callBackName + + " is duplicated and has been registered already."); + } else { + callBacks.put(callBackName, callBack); + } + } + + /** + * Get the registered callbacks. + * @return + */ + Map getScaleCallBacks() { + return Collections.unmodifiableMap(callBacks); + } + + /** + * Get the internal scale value of MemoryManger + * @return + */ + double getScale() { + return scale; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index d849843b19..ea3101a2b5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -347,7 +347,7 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) */ private static MemoryManager memoryManager; - static MemoryManager getMemoryManager() { + public static MemoryManager getMemoryManager() { return memoryManager; } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java index fd56957a2a..bbe74430ae 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMemoryManager.java @@ -48,6 +48,8 @@ public class TestMemoryManager { int rowGroupSize; ParquetOutputFormat parquetOutputFormat; CompressionCodecName codec; + int counter = 0; + boolean firstRegister = true; @Before public void setUp() { @@ -87,12 +89,36 @@ public void testMemoryManager() throws Exception { //Verify the memory pool Assert.assertEquals("memory pool size is incorrect.", expectPoolSize, parquetOutputFormat.getMemoryManager().getTotalMemoryPool()); + + //Verify Callback mechanism + Assert.assertEquals("counter calculated by callback is incorrect.", 1, counter); + Assert.assertEquals("CallBack is duplicated.", 1, parquetOutputFormat.getMemoryManager() + .getScaleCallBacks().size()); } private RecordWriter createWriter(int index) throws Exception{ Path file = new Path("target/test/", "parquet" + index); parquetOutputFormat = new ParquetOutputFormat(new GroupWriteSupport()); - return parquetOutputFormat.getRecordWriter(conf, file, codec); + RecordWriter writer = parquetOutputFormat.getRecordWriter(conf, file, codec); + try { + parquetOutputFormat.getMemoryManager().registerScaleCallBack("increment-test-counter", + new Runnable() { + @Override + public void run() { + counter++; + } + }); + if (!firstRegister) { + Assert.fail("Duplicated registering callback should throw duplicates exception."); + } + firstRegister = false; + } catch (IllegalArgumentException e) { + if (firstRegister) { + Assert.fail("Registering the same callback first time should succeed."); + } + } + + return writer; } private void verifyRowGroupSize(int expectRowGroupSize) {