diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 39b65da9fa..13c75a9e92 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,8 +45,11 @@ public class ParquetProperties { public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0; public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; - public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = true; + public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 100; + public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 10000; + public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 100; + public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 10000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -80,12 +83,15 @@ public static WriterVersion fromString(String name) { private final boolean enableDictionary; private final int minRowCountForPageSizeCheck; private final int maxRowCountForPageSizeCheck; - private final boolean estimateNextSizeCheck; + private final int minRowCountForRowGroupSizeCheck; + private final int maxRowCountForRowGroupSizeCheck; + private final boolean estimateNextPageSizeCheck; + private final boolean estimateNextRowGroupSizeCheck; private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, - int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, + int maxRowCountForPageSizeCheck, int minRowCountForRowGroupSizeCheck, int maxRowCountForRowGroupSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextRowGroupSizeCheck, ByteBufferAllocator allocator, ValuesWriterFactory writerFactory) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream @@ -95,7 +101,10 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.enableDictionary = enableDict; this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck; - this.estimateNextSizeCheck = estimateNextSizeCheck; + this.minRowCountForRowGroupSizeCheck = minRowCountForRowGroupSizeCheck; + this.maxRowCountForRowGroupSizeCheck = maxRowCountForRowGroupSizeCheck; + this.estimateNextPageSizeCheck = estimateNextPageSizeCheck; + this.estimateNextRowGroupSizeCheck = estimateNextRowGroupSizeCheck; this.allocator = allocator; this.valuesWriterFactory = writerFactory; @@ -179,12 +188,24 @@ public int getMaxRowCountForPageSizeCheck() { return maxRowCountForPageSizeCheck; } + public int getMinRowCountForRowGroupSizeCheck() { + return minRowCountForRowGroupSizeCheck; + } + + public int getMaxRowCountForRowGroupSizeCheck() { + return maxRowCountForRowGroupSizeCheck; + } + public ValuesWriterFactory getValuesWriterFactory() { return valuesWriterFactory; } - public boolean estimateNextSizeCheck() { - return estimateNextSizeCheck; + public boolean estimateNextPageSizeCheck() { + return estimateNextPageSizeCheck; + } + + public boolean estimateNextRowGroupSizeCheck() { + return estimateNextRowGroupSizeCheck; } public static Builder builder() { @@ -200,9 +221,12 @@ public static class Builder { private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE; private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED; private WriterVersion writerVersion = DEFAULT_WRITER_VERSION; - private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; - private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; - private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; + private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; + private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK; + private boolean estimateNextPageSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; + private int minRowCountForRowGroupSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK; + private int maxRowCountForRowGroupSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK; + private boolean estimateNextRowGroupSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; @@ -215,7 +239,10 @@ private Builder(ParquetProperties toCopy) { this.writerVersion = toCopy.writerVersion; this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; - this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; + this.estimateNextPageSizeCheck = toCopy.estimateNextPageSizeCheck; + this.minRowCountForRowGroupSizeCheck = toCopy.minRowCountForRowGroupSizeCheck; + this.maxRowCountForRowGroupSizeCheck = toCopy.maxRowCountForRowGroupSizeCheck; + this.estimateNextRowGroupSizeCheck = toCopy.estimateNextRowGroupSizeCheck; this.allocator = toCopy.allocator; } @@ -281,9 +308,28 @@ public Builder withMaxRowCountForPageSizeCheck(int max) { return this; } + public Builder withMinRowCountForRowGroupSizeCheck(int min) { + Preconditions.checkArgument(min > 0, + "Invalid row count for block size check (negative): %s", min); + this.minRowCountForRowGroupSizeCheck = min; + return this; + } + + public Builder withMaxRowCountForRowGroupSizeCheck(int max) { + Preconditions.checkArgument(max > 0, + "Invalid row count for block size check (negative): %s", max); + this.maxRowCountForRowGroupSizeCheck = max; + return this; + } + // Do not attempt to predict next size check. Prevents issues with rows that vary significantly in size. public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) { - this.estimateNextSizeCheck = estimateNextSizeCheck; + this.estimateNextPageSizeCheck = estimateNextSizeCheck; + return this; + } + + public Builder estimateRowCountForRowGroupSizeCheck(boolean estimateRowGroupSizeCheck) { + this.estimateNextRowGroupSizeCheck = estimateRowGroupSizeCheck; return this; } @@ -303,7 +349,9 @@ public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory); + minRowCountForRowGroupSizeCheck, maxRowCountForRowGroupSizeCheck, + estimateNextPageSizeCheck, estimateNextRowGroupSizeCheck, + allocator, valuesWriterFactory); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java index 7574cedf75..e13eacde77 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -161,7 +161,7 @@ private void sizeCheck() { minRecordToWait = props.getMinRowCountForPageSizeCheck(); } - if(props.estimateNextSizeCheck()) { + if(props.estimateNextPageSizeCheck()) { // will check again halfway if between min and max rowCountForNextSizeCheck = rowCount + min( diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index c1f5d67b01..7e8538b8ed 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -98,13 +98,13 @@ private void accountForValueWritten() { + dataColumn.getBufferedSize(); if (memSize > props.getPageSizeThreshold()) { // we will write the current page and check again the size at the predicted middle of next page - if (props.estimateNextSizeCheck()) { + if (props.estimateNextPageSizeCheck()) { valueCountForNextSizeCheck = valueCount / 2; } else { valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); } writePage(); - } else if (props.estimateNextSizeCheck()) { + } else if (props.estimateNextPageSizeCheck()) { // not reached the threshold, will check again midway valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1; } else { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d9e9b5e15e..0dd6906136 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -43,14 +43,11 @@ class InternalParquetRecordWriter { private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class); - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - private final ParquetFileWriter parquetFileWriter; private final WriteSupport writeSupport; private final MessageType schema; private final Map extraMetaData; - private final long rowGroupSize; + private final boolean estimateSizeCheckRows; private long rowGroupSizeThreshold; private long nextRowGroupSize; private final BytesCompressor compressor; @@ -60,12 +57,14 @@ class InternalParquetRecordWriter { private boolean closed; private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + private long recordCountForNextMemCheck; private long lastRowGroupEndPos = 0; private ColumnWriteStore columnStore; private ColumnChunkPageWriteStore pageStore; private RecordConsumer recordConsumer; + private int sizeCheckMinRows; + private int sizeCheckMaxRows; /** * @param parquetFileWriter the file to write to @@ -88,12 +87,14 @@ public InternalParquetRecordWriter( this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; this.extraMetaData = extraMetaData; - this.rowGroupSize = rowGroupSize; this.rowGroupSizeThreshold = rowGroupSize; this.nextRowGroupSize = rowGroupSizeThreshold; this.compressor = compressor; this.validating = validating; this.props = props; + this.sizeCheckMinRows = props.getMinRowCountForRowGroupSizeCheck(); + this.sizeCheckMaxRows = props.getMaxRowCountForRowGroupSizeCheck(); + this.estimateSizeCheckRows = props.estimateNextRowGroupSizeCheck(); initStore(); } @@ -147,13 +148,22 @@ private void checkBlockSizeReached() throws IOException { LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount); flushRowGroupToStore(); initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); + if (estimateSizeCheckRows) { + recordCountForNextMemCheck = min(max(sizeCheckMinRows, recordCount / 2), sizeCheckMaxRows); + } else { + recordCountForNextMemCheck = sizeCheckMinRows; + } this.lastRowGroupEndPos = parquetFileWriter.getPos(); } else { - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); + if (estimateSizeCheckRows) { + recordCountForNextMemCheck = min( + max(sizeCheckMinRows, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2), + // will check halfway + recordCount + sizeCheckMaxRows // will not look more than max records ahead + ); + } else { + recordCountForNextMemCheck += sizeCheckMinRows; + } LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index ff5bab397d..095e79d8b4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -142,7 +142,10 @@ public static enum JobSummaryLevel { public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min"; public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; + public static final String MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.min"; + public static final String MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.max"; public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; + public static final String ESTIMATE_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.check.estimate"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -243,12 +246,27 @@ public static boolean getEnableDictionary(Configuration configuration) { public static int getMinRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, - ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK); + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); } public static int getMaxRowCountForPageSizeCheck(Configuration configuration) { return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK, - ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); + } + + public static int getMinRowCountForRowGroupSizeCheck(Configuration configuration) { + return configuration.getInt(MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK); + } + + public static int getMaxRowCountForRowGroupSizeCheck(Configuration configuration) { + return configuration.getInt(MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK); + } + + public static boolean getEstimateBlockSizeCheck(Configuration configuration) { + return configuration.getBoolean(ESTIMATE_ROW_GROUP_SIZE_CHECK, + ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK); } public static boolean getEstimatePageSizeCheck(Configuration configuration) { @@ -364,8 +382,11 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withDictionaryEncoding(getEnableDictionary(conf)) .withWriterVersion(getWriterVersion(conf)) .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) + .estimateRowCountForRowGroupSizeCheck(getEstimateBlockSizeCheck(conf)) .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) + .withMinRowCountForRowGroupSizeCheck(getMinRowCountForRowGroupSizeCheck(conf)) + .withMaxRowCountForRowGroupSizeCheck(getMaxRowCountForRowGroupSizeCheck(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -380,9 +401,11 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Validation is {}", (validating ? "on" : "off")); LOG.info("Writer version is: {}", props.getWriterVersion()); LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize); - LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant")); + LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant")); LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); + LOG.info("Min row count for row group size check is: {}", props.getMinRowCountForRowGroupSizeCheck()); + LOG.info("Max row count for row group size check is: {}", props.getMaxRowCountForRowGroupSizeCheck()); } WriteContext init = writeSupport.init(conf);