From 943dcb5522061f5b1e24606b9784fc2391ccad84 Mon Sep 17 00:00:00 2001 From: Pradeep Gollakota Date: Tue, 9 Jan 2018 18:38:53 -0800 Subject: [PATCH 1/2] PARQUET-869 Configurable record counts for block size checks --- .../parquet/column/ParquetProperties.java | 80 +++++++++++++++---- .../column/impl/ColumnWriteStoreV2.java | 8 +- .../parquet/column/impl/ColumnWriterV1.java | 10 +-- .../hadoop/InternalParquetRecordWriter.java | 35 +++++--- .../parquet/hadoop/ParquetOutputFormat.java | 41 +++++++--- 5 files changed, 129 insertions(+), 45 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 39b65da9fa..0de7fe88f8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,8 +45,11 @@ public class ParquetProperties { public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0; public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; - public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = true; + public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 100; + public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 10000; + public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 100; + public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 10000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -80,12 +83,15 @@ public static WriterVersion fromString(String name) { private final boolean enableDictionary; private final int minRowCountForPageSizeCheck; private final int maxRowCountForPageSizeCheck; - private final boolean estimateNextSizeCheck; + private final int minRowCountForBlockSizeCheck; + private final int maxRowCountForBlockSizeCheck; + private final boolean estimateNextPageSizeCheck; + private final boolean estimateNextBlockSizeCheck; private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, - int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, + int maxRowCountForPageSizeCheck, int minRowCountForBlockSizeCheck, int maxRowCountForBlockSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextBlockSizeCheck, ByteBufferAllocator allocator, ValuesWriterFactory writerFactory) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream @@ -95,7 +101,10 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.enableDictionary = enableDict; this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck; - this.estimateNextSizeCheck = estimateNextSizeCheck; + this.minRowCountForBlockSizeCheck = minRowCountForBlockSizeCheck; + this.maxRowCountForBlockSizeCheck = maxRowCountForBlockSizeCheck; + this.estimateNextPageSizeCheck = estimateNextPageSizeCheck; + this.estimateNextBlockSizeCheck = estimateNextBlockSizeCheck; this.allocator = allocator; this.valuesWriterFactory = writerFactory; @@ -179,12 +188,24 @@ public int getMaxRowCountForPageSizeCheck() { return maxRowCountForPageSizeCheck; } + public int getMinRowCountForBlockSizeCheck() { + return minRowCountForBlockSizeCheck; + } + + public int getMaxRowCountForBlockSizeCheck() { + return maxRowCountForBlockSizeCheck; + } + public ValuesWriterFactory getValuesWriterFactory() { return valuesWriterFactory; } - public boolean estimateNextSizeCheck() { - return estimateNextSizeCheck; + public boolean estimateNextPageSizeCheck() { + return estimateNextPageSizeCheck; + } + + public boolean estimateNextBlockSizeCheck() { + return estimateNextBlockSizeCheck; } public static Builder builder() { @@ -200,9 +221,12 @@ public static class Builder { private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE; private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED; private WriterVersion writerVersion = DEFAULT_WRITER_VERSION; - private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; - private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; - private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; + private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; + private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; + private boolean estimateNextPageSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; + private int minRowCountForBlockSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK; + private int maxRowCountForBlockSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK; + private boolean estimateNextBlockSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; @@ -215,7 +239,10 @@ private Builder(ParquetProperties toCopy) { this.writerVersion = toCopy.writerVersion; this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; - this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; + this.estimateNextPageSizeCheck = toCopy.estimateNextPageSizeCheck; + this.minRowCountForBlockSizeCheck = toCopy.minRowCountForBlockSizeCheck; + this.maxRowCountForBlockSizeCheck = toCopy.maxRowCountForBlockSizeCheck; + this.estimateNextBlockSizeCheck = toCopy.estimateNextBlockSizeCheck; this.allocator = toCopy.allocator; } @@ -281,9 +308,28 @@ public Builder withMaxRowCountForPageSizeCheck(int max) { return this; } + public Builder withMinRowCountForBlockSizeCheck(int min) { + Preconditions.checkArgument(min > 0, + "Invalid row count for block size check (negative): %s", min); + this.minRowCountForBlockSizeCheck = min; + return this; + } + + public Builder withMaxRowCountForBlockSizeCheck(int max) { + Preconditions.checkArgument(max > 0, + "Invalid row count for block size check (negative): %s", max); + this.maxRowCountForBlockSizeCheck = max; + return this; + } + // Do not attempt to predict next size check. Prevents issues with rows that vary significantly in size. public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) { - this.estimateNextSizeCheck = estimateNextSizeCheck; + this.estimateNextPageSizeCheck = estimateNextSizeCheck; + return this; + } + + public Builder estimateRowCountForBlockSizeCheck(boolean estimateBlockSizeCheck) { + this.estimateNextBlockSizeCheck = estimateBlockSizeCheck; return this; } @@ -303,7 +349,9 @@ public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory); + minRowCountForBlockSizeCheck, maxRowCountForBlockSizeCheck, + estimateNextPageSizeCheck, estimateNextBlockSizeCheck, + allocator, valuesWriterFactory); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java index 7574cedf75..e13eacde77 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -161,7 +161,7 @@ private void sizeCheck() { minRecordToWait = props.getMinRowCountForPageSizeCheck(); } - if(props.estimateNextSizeCheck()) { + if(props.estimateNextPageSizeCheck()) { // will check again halfway if between min and max rowCountForNextSizeCheck = rowCount + min( diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index c1f5d67b01..7e8538b8ed 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -98,13 +98,13 @@ private void accountForValueWritten() { + dataColumn.getBufferedSize(); if (memSize > props.getPageSizeThreshold()) { // we will write the current page and check again the size at the predicted middle of next page - if (props.estimateNextSizeCheck()) { + if (props.estimateNextPageSizeCheck()) { valueCountForNextSizeCheck = valueCount / 2; } else { valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); } writePage(); - } else if (props.estimateNextSizeCheck()) { + } else if (props.estimateNextPageSizeCheck()) { // not reached the threshold, will check again midway valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1; } else { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d9e9b5e15e..0f4ff05882 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -43,14 +43,11 @@ class InternalParquetRecordWriter { private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class); - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - private final ParquetFileWriter parquetFileWriter; private final WriteSupport writeSupport; private final MessageType schema; private final Map extraMetaData; - private final long rowGroupSize; + private final boolean estimateNextBlockSizeCheck; private long rowGroupSizeThreshold; private long nextRowGroupSize; private final BytesCompressor compressor; @@ -60,12 +57,14 @@ class InternalParquetRecordWriter { private boolean closed; private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + private long recordCountForNextMemCheck; private long lastRowGroupEndPos = 0; private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; private RecordConsumer recordConsumer; + private int minRecordCountForBlockSizeCheck; + private int maxRecordCountForBlockSizeCheck; /** * @param parquetFileWriter the file to write to @@ -88,12 +87,14 @@ public InternalParquetRecordWriter( this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; this.extraMetaData = extraMetaData; - this.rowGroupSize = rowGroupSize; this.rowGroupSizeThreshold = rowGroupSize; this.nextRowGroupSize = rowGroupSizeThreshold; this.compressor = compressor; this.validating = validating; this.props = props; + this.minRecordCountForBlockSizeCheck = props.getMinRowCountForPageSizeCheck(); + this.maxRecordCountForBlockSizeCheck = props.getMaxRowCountForPageSizeCheck(); + this.estimateNextBlockSizeCheck = props.estimateNextBlockSizeCheck(); initStore(); } @@ -107,6 +108,8 @@ private void initStore() { MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); + System.out.println(String.format("Created ParquetWriter with [%d, %d] for block size checks. Estimation(%s). BlockSize(%d)", + minRecordCountForBlockSizeCheck, maxRecordCountForBlockSizeCheck, estimateNextBlockSizeCheck, rowGroupSizeThreshold)); } public void close() throws IOException, InterruptedException { @@ -147,13 +150,23 @@ private void checkBlockSizeReached() throws IOException { LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount); flushRowGroupToStore(); initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); + if (estimateNextBlockSizeCheck) { + recordCountForNextMemCheck = min(max(minRecordCountForBlockSizeCheck, recordCount / 2), + maxRecordCountForBlockSizeCheck); + } else { + recordCountForNextMemCheck = minRecordCountForBlockSizeCheck; + } this.lastRowGroupEndPos = parquetFileWriter.getPos(); } else { - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); + if (estimateNextBlockSizeCheck) { + recordCountForNextMemCheck = min( + max(minRecordCountForBlockSizeCheck, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2), + // will check halfway + recordCount + maxRecordCountForBlockSizeCheck // will not look more than max records ahead + ); + } else { + recordCountForNextMemCheck += minRecordCountForBlockSizeCheck; + } LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index ff5bab397d..17f4ebba72 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -140,9 +140,12 @@ public static enum JobSummaryLevel { public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio"; public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size"; public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; - public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min"; - public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; - public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; + public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_min"; + public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_max"; + public static final String MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_min"; + public static final String MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_max"; + public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size_check_estimate"; + public static final String ESTIMATE_BLOCK_SIZE_CHECK = "parquet.block.size_check_estimate"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -243,12 +246,27 @@ public static boolean getEnableDictionary(Configuration configuration) { public static int getMinRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, - ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK); + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); } public static int getMaxRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK, - ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); + } + + public static int getMinRowCountForBlockSizeCheck(Configuration configuration) { + return configuration.getInt(MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); + } + + public static int getMaxRowCountForBlockSizeCheck(Configuration configuration) { + return configuration.getInt(MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); + } + + public static boolean getEstimateBlockSizeCheck(Configuration configuration) { + return configuration.getBoolean(ESTIMATE_BLOCK_SIZE_CHECK, + ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK); } public static boolean getEstimatePageSizeCheck(Configuration configuration) { @@ -364,8 +382,11 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withDictionaryEncoding(getEnableDictionary(conf)) .withWriterVersion(getWriterVersion(conf)) .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) + .estimateRowCountForBlockSizeCheck(getEstimateBlockSizeCheck(conf)) .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) + .withMinRowCountForBlockSizeCheck(getMinRowCountForBlockSizeCheck(conf)) + .withMaxRowCountForBlockSizeCheck(getMaxRowCountForBlockSizeCheck(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -380,9 +401,11 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Validation is {}", (validating ? "on" : "off")); LOG.info("Writer version is: {}", props.getWriterVersion()); LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize); - LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant")); + LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant")); LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); + LOG.info("Min row count for block size check is: {}", props.getMinRowCountForBlockSizeCheck()); + LOG.info("Max row count for block size check is: {}", props.getMaxRowCountForBlockSizeCheck()); } WriteContext init = writeSupport.init(conf); From 4d5dd4e568d88173f62a16b8a0da915058911736 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 30 Apr 2018 17:15:31 -0700 Subject: [PATCH 2/2] PARQUET-869: Use "row group" instead of "block". --- .../parquet/column/ParquetProperties.java | 60 +++++++++---------- .../hadoop/InternalParquetRecordWriter.java | 29 ++++----- .../parquet/hadoop/ParquetOutputFormat.java | 38 ++++++------ 3 files changed, 62 insertions(+), 65 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 0de7fe88f8..13c75a9e92 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -45,11 +45,11 @@ public class ParquetProperties { public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0; public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; - public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = true; + public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = true; public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 10000; - public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 100; - public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 10000; + public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 100; + public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 10000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -83,15 +83,15 @@ public static WriterVersion fromString(String name) { private final boolean enableDictionary; private final int minRowCountForPageSizeCheck; private final int maxRowCountForPageSizeCheck; - private final int minRowCountForBlockSizeCheck; - private final int maxRowCountForBlockSizeCheck; + private final int minRowCountForRowGroupSizeCheck; + private final int maxRowCountForRowGroupSizeCheck; private final boolean estimateNextPageSizeCheck; - private final boolean estimateNextBlockSizeCheck; + private final boolean estimateNextRowGroupSizeCheck; private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, - int maxRowCountForPageSizeCheck, int minRowCountForBlockSizeCheck, int maxRowCountForBlockSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextBlockSizeCheck, ByteBufferAllocator allocator, + int maxRowCountForPageSizeCheck, int minRowCountForRowGroupSizeCheck, int maxRowCountForRowGroupSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextRowGroupSizeCheck, ByteBufferAllocator allocator, ValuesWriterFactory writerFactory) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream @@ -101,10 +101,10 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.enableDictionary = enableDict; this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck; - this.minRowCountForBlockSizeCheck = minRowCountForBlockSizeCheck; - this.maxRowCountForBlockSizeCheck = maxRowCountForBlockSizeCheck; + this.minRowCountForRowGroupSizeCheck = minRowCountForRowGroupSizeCheck; + this.maxRowCountForRowGroupSizeCheck = maxRowCountForRowGroupSizeCheck; this.estimateNextPageSizeCheck = estimateNextPageSizeCheck; - this.estimateNextBlockSizeCheck = estimateNextBlockSizeCheck; + this.estimateNextRowGroupSizeCheck = estimateNextRowGroupSizeCheck; this.allocator = allocator; this.valuesWriterFactory = writerFactory; @@ -188,12 +188,12 @@ public int getMaxRowCountForPageSizeCheck() { return maxRowCountForPageSizeCheck; } - public int getMinRowCountForBlockSizeCheck() { - return minRowCountForBlockSizeCheck; + public int getMinRowCountForRowGroupSizeCheck() { + return minRowCountForRowGroupSizeCheck; } - public int getMaxRowCountForBlockSizeCheck() { - return maxRowCountForBlockSizeCheck; + public int getMaxRowCountForRowGroupSizeCheck() { + return maxRowCountForRowGroupSizeCheck; } public ValuesWriterFactory getValuesWriterFactory() { @@ -204,8 +204,8 @@ public boolean estimateNextPageSizeCheck() { return estimateNextPageSizeCheck; } - public boolean estimateNextBlockSizeCheck() { - return estimateNextBlockSizeCheck; + public boolean estimateNextRowGroupSizeCheck() { + return estimateNextRowGroupSizeCheck; } public static Builder builder() { @@ -224,9 +224,9 @@ public static class Builder { private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; private boolean estimateNextPageSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; - private int minRowCountForBlockSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK; - private int maxRowCountForBlockSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK; - private boolean estimateNextBlockSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK; + private int minRowCountForRowGroupSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK; + private int maxRowCountForRowGroupSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK; + private boolean estimateNextRowGroupSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; @@ -240,9 +240,9 @@ private Builder(ParquetProperties toCopy) { this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; this.estimateNextPageSizeCheck = toCopy.estimateNextPageSizeCheck; - this.minRowCountForBlockSizeCheck = toCopy.minRowCountForBlockSizeCheck; - this.maxRowCountForBlockSizeCheck = toCopy.maxRowCountForBlockSizeCheck; - this.estimateNextBlockSizeCheck = toCopy.estimateNextBlockSizeCheck; + this.minRowCountForRowGroupSizeCheck = toCopy.minRowCountForRowGroupSizeCheck; + this.maxRowCountForRowGroupSizeCheck = toCopy.maxRowCountForRowGroupSizeCheck; + this.estimateNextRowGroupSizeCheck = toCopy.estimateNextRowGroupSizeCheck; this.allocator = toCopy.allocator; } @@ -308,17 +308,17 @@ public Builder withMaxRowCountForPageSizeCheck(int max) { return this; } - public Builder withMinRowCountForBlockSizeCheck(int min) { + public Builder withMinRowCountForRowGroupSizeCheck(int min) { Preconditions.checkArgument(min > 0, "Invalid row count for block size check (negative): %s", min); - this.minRowCountForBlockSizeCheck = min; + this.minRowCountForRowGroupSizeCheck = min; return this; } - public Builder withMaxRowCountForBlockSizeCheck(int max) { + public Builder withMaxRowCountForRowGroupSizeCheck(int max) { Preconditions.checkArgument(max > 0, "Invalid row count for block size check (negative): %s", max); - this.maxRowCountForBlockSizeCheck = max; + this.maxRowCountForRowGroupSizeCheck = max; return this; } @@ -328,8 +328,8 @@ public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) { return this; } - public Builder estimateRowCountForBlockSizeCheck(boolean estimateBlockSizeCheck) { - this.estimateNextBlockSizeCheck = estimateBlockSizeCheck; + public Builder estimateRowCountForRowGroupSizeCheck(boolean estimateRowGroupSizeCheck) { + this.estimateNextRowGroupSizeCheck = estimateRowGroupSizeCheck; return this; } @@ -349,8 +349,8 @@ public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - minRowCountForBlockSizeCheck, maxRowCountForBlockSizeCheck, - estimateNextPageSizeCheck, estimateNextBlockSizeCheck, + minRowCountForRowGroupSizeCheck, maxRowCountForRowGroupSizeCheck, + estimateNextPageSizeCheck, estimateNextRowGroupSizeCheck, allocator, valuesWriterFactory); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 0f4ff05882..0dd6906136 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -47,7 +47,7 @@ class InternalParquetRecordWriter { private final WriteSupport writeSupport; private final MessageType schema; private final Map extraMetaData; - private final boolean estimateNextBlockSizeCheck; + private final boolean estimateSizeCheckRows; private long rowGroupSizeThreshold; private long nextRowGroupSize; private final BytesCompressor compressor; @@ -63,8 +63,8 @@ class InternalParquetRecordWriter { private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; private RecordConsumer recordConsumer; - private int minRecordCountForBlockSizeCheck; - private int maxRecordCountForBlockSizeCheck; + private int sizeCheckMinRows; + private int sizeCheckMaxRows; /** * @param parquetFileWriter the file to write to @@ -92,9 +92,9 @@ public InternalParquetRecordWriter( this.compressor = compressor; this.validating = validating; this.props = props; - this.minRecordCountForBlockSizeCheck = props.getMinRowCountForPageSizeCheck(); - this.maxRecordCountForBlockSizeCheck = props.getMaxRowCountForPageSizeCheck(); - this.estimateNextBlockSizeCheck = props.estimateNextBlockSizeCheck(); + this.sizeCheckMinRows = props.getMinRowCountForRowGroupSizeCheck(); + this.sizeCheckMaxRows = props.getMaxRowCountForRowGroupSizeCheck(); + this.estimateSizeCheckRows = props.estimateNextRowGroupSizeCheck(); initStore(); } @@ -108,8 +108,6 @@ private void initStore() { MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); writeSupport.prepareForWrite(recordConsumer); - System.out.println(String.format("Created ParquetWriter with [%d, %d] for block size checks. Estimation(%s). BlockSize(%d)", - minRecordCountForBlockSizeCheck, maxRecordCountForBlockSizeCheck, estimateNextBlockSizeCheck, rowGroupSizeThreshold)); } public void close() throws IOException, InterruptedException { @@ -150,22 +148,21 @@ private void checkBlockSizeReached() throws IOException { LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount); flushRowGroupToStore(); initStore(); - if (estimateNextBlockSizeCheck) { - recordCountForNextMemCheck = min(max(minRecordCountForBlockSizeCheck, recordCount / 2), - maxRecordCountForBlockSizeCheck); + if (estimateSizeCheckRows) { + recordCountForNextMemCheck = min(max(sizeCheckMinRows, recordCount / 2), sizeCheckMaxRows); } else { - recordCountForNextMemCheck = minRecordCountForBlockSizeCheck; + recordCountForNextMemCheck = sizeCheckMinRows; } this.lastRowGroupEndPos = parquetFileWriter.getPos(); } else { - if (estimateNextBlockSizeCheck) { + if (estimateSizeCheckRows) { recordCountForNextMemCheck = min( - max(minRecordCountForBlockSizeCheck, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2), + max(sizeCheckMinRows, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2), // will check halfway - recordCount + maxRecordCountForBlockSizeCheck // will not look more than max records ahead + recordCount + sizeCheckMaxRows // will not look more than max records ahead ); } else { - recordCountForNextMemCheck += minRecordCountForBlockSizeCheck; + recordCountForNextMemCheck += sizeCheckMinRows; } LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 17f4ebba72..095e79d8b4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -140,12 +140,12 @@ public static enum JobSummaryLevel { public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio"; public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size"; public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; - public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_min"; - public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_max"; - public static final String MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_min"; - public static final String MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_max"; - public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size_check_estimate"; - public static final String ESTIMATE_BLOCK_SIZE_CHECK = "parquet.block.size_check_estimate"; + public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min"; + public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; + public static final String MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.min"; + public static final String MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.max"; + public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; + public static final String ESTIMATE_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.check.estimate"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -254,19 +254,19 @@ public static int getMaxRowCountForPageSizeCheck(Configuration configuration) { ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); } - public static int getMinRowCountForBlockSizeCheck(Configuration configuration) { - return configuration.getInt(MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, - ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); + public static int getMinRowCountForRowGroupSizeCheck(Configuration configuration) { + return configuration.getInt(MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK); } - public static int getMaxRowCountForBlockSizeCheck(Configuration configuration) { - return configuration.getInt(MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, - ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); + public static int getMaxRowCountForRowGroupSizeCheck(Configuration configuration) { + return configuration.getInt(MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK); } public static boolean getEstimateBlockSizeCheck(Configuration configuration) { - return configuration.getBoolean(ESTIMATE_BLOCK_SIZE_CHECK, - ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK); + return configuration.getBoolean(ESTIMATE_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK); } public static boolean getEstimatePageSizeCheck(Configuration configuration) { @@ -382,11 +382,11 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withDictionaryEncoding(getEnableDictionary(conf)) .withWriterVersion(getWriterVersion(conf)) .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) - .estimateRowCountForBlockSizeCheck(getEstimateBlockSizeCheck(conf)) + .estimateRowCountForRowGroupSizeCheck(getEstimateBlockSizeCheck(conf)) .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) - .withMinRowCountForBlockSizeCheck(getMinRowCountForBlockSizeCheck(conf)) - .withMaxRowCountForBlockSizeCheck(getMaxRowCountForBlockSizeCheck(conf)) + .withMinRowCountForRowGroupSizeCheck(getMinRowCountForRowGroupSizeCheck(conf)) + .withMaxRowCountForRowGroupSizeCheck(getMaxRowCountForRowGroupSizeCheck(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -404,8 +404,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant")); LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); - LOG.info("Min row count for block size check is: {}", props.getMinRowCountForBlockSizeCheck()); - LOG.info("Max row count for block size check is: {}", props.getMaxRowCountForBlockSizeCheck()); + LOG.info("Min row count for row group size check is: {}", props.getMinRowCountForRowGroupSizeCheck()); + LOG.info("Max row count for row group size check is: {}", props.getMaxRowCountForRowGroupSizeCheck()); } WriteContext init = writeSupport.init(conf);