-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-869 Configurable min/max record counts for block size check #447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,14 +42,11 @@ | |
| class InternalParquetRecordWriter<T> { | ||
| private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class); | ||
|
|
||
| private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; | ||
| private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; | ||
|
|
||
| private final ParquetFileWriter parquetFileWriter; | ||
| private final WriteSupport<T> writeSupport; | ||
| private final MessageType schema; | ||
| private final Map<String, String> extraMetaData; | ||
| private final long rowGroupSize; | ||
| private final boolean estimateNextBlockSizeCheck; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was rowGroupSize not used? Why was it removed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wrote this code over a year ago... I don't remember, but I'll check. I think it was an unused variable. |
||
| private long rowGroupSizeThreshold; | ||
| private long nextRowGroupSize; | ||
| private final BytesCompressor compressor; | ||
|
|
@@ -59,12 +56,14 @@ class InternalParquetRecordWriter<T> { | |
| private boolean closed; | ||
|
|
||
| private long recordCount = 0; | ||
| private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; | ||
| private long recordCountForNextMemCheck; | ||
| private long lastRowGroupEndPos = 0; | ||
|
|
||
| private ColumnWriteStore columnStore; | ||
| private ColumnChunkPageWriteStore pageStore; | ||
| private RecordConsumer recordConsumer; | ||
| private int minRecordCountForBlockSizeCheck; | ||
| private int maxRecordCountForBlockSizeCheck; | ||
|
|
||
| /** | ||
| * @param parquetFileWriter the file to write to | ||
|
|
@@ -87,12 +86,14 @@ public InternalParquetRecordWriter( | |
| this.writeSupport = checkNotNull(writeSupport, "writeSupport"); | ||
| this.schema = schema; | ||
| this.extraMetaData = extraMetaData; | ||
| this.rowGroupSize = rowGroupSize; | ||
| this.rowGroupSizeThreshold = rowGroupSize; | ||
| this.nextRowGroupSize = rowGroupSizeThreshold; | ||
| this.compressor = compressor; | ||
| this.validating = validating; | ||
| this.props = props; | ||
| this.minRecordCountForBlockSizeCheck = props.getMinRowCountForPageSizeCheck(); | ||
| this.maxRecordCountForBlockSizeCheck = props.getMaxRowCountForPageSizeCheck(); | ||
| this.estimateNextBlockSizeCheck = props.estimateNextBlockSizeCheck(); | ||
| initStore(); | ||
| } | ||
|
|
||
|
|
@@ -102,6 +103,8 @@ private void initStore() { | |
| MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); | ||
| this.recordConsumer = columnIO.getRecordWriter(columnStore); | ||
| writeSupport.prepareForWrite(recordConsumer); | ||
| System.out.println(String.format("Created ParquetWriter with [%d, %d] for block size checks. Estimation(%s). BlockSize(%d)", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use SLF4J for logging. You can see examples in the rest of the library.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops... missed removing this statement. |
||
| minRecordCountForBlockSizeCheck, maxRecordCountForBlockSizeCheck, estimateNextBlockSizeCheck, rowGroupSizeThreshold)); | ||
| } | ||
|
|
||
| public void close() throws IOException, InterruptedException { | ||
|
|
@@ -142,13 +145,23 @@ private void checkBlockSizeReached() throws IOException { | |
| LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount); | ||
| flushRowGroupToStore(); | ||
| initStore(); | ||
| recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); | ||
| if (estimateNextBlockSizeCheck) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need this setting, or is it included to match the page size checking? I don't think we would ever want to set it to true because it is important to be under the max row group size.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Working on the comments @rdblue... do you think instead of a min/max record count for row group size checks, we should check every n rows?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've taken a closer look at this code. The only change with the If So, I added the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consistency here is fine. I'm not sure we will use it, but I can certainly think of valid use cases for it. |
||
| recordCountForNextMemCheck = min(max(minRecordCountForBlockSizeCheck, recordCount / 2), | ||
| maxRecordCountForBlockSizeCheck); | ||
| } else { | ||
| recordCountForNextMemCheck = minRecordCountForBlockSizeCheck; | ||
| } | ||
| this.lastRowGroupEndPos = parquetFileWriter.getPos(); | ||
| } else { | ||
| recordCountForNextMemCheck = min( | ||
| max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway | ||
| recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead | ||
| ); | ||
| if (estimateNextBlockSizeCheck) { | ||
| recordCountForNextMemCheck = min( | ||
| max(minRecordCountForBlockSizeCheck, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2), | ||
| // will check halfway | ||
| recordCount + maxRecordCountForBlockSizeCheck // will not look more than max records ahead | ||
| ); | ||
| } else { | ||
| recordCountForNextMemCheck += minRecordCountForBlockSizeCheck; | ||
| } | ||
| LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,14 @@ | ||
| /* | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
|
|
@@ -142,9 +142,12 @@ public static enum JobSummaryLevel { | |
| public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio"; | ||
| public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size"; | ||
| public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; | ||
| public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min"; | ||
| public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; | ||
| public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; | ||
| public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_min"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The page properties should not change, and the row group properties should match the naming convention for pages.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops... it was changed for internal reasons... forgot to change it back. |
||
| public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_max"; | ||
| public static final String MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_min"; | ||
| public static final String MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_max"; | ||
| public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size_check_estimate"; | ||
| public static final String ESTIMATE_BLOCK_SIZE_CHECK = "parquet.block.size_check_estimate"; | ||
|
|
||
| public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { | ||
| String level = conf.get(JOB_SUMMARY_LEVEL); | ||
|
|
@@ -245,12 +248,27 @@ public static boolean getEnableDictionary(Configuration configuration) { | |
|
|
||
| public static int getMinRowCountForPageSizeCheck(Configuration configuration) { | ||
| return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, | ||
| ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK); | ||
| ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); | ||
| } | ||
|
|
||
| public static int getMaxRowCountForPageSizeCheck(Configuration configuration) { | ||
| return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK, | ||
| ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK); | ||
| ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK); | ||
| } | ||
|
|
||
| public static int getMinRowCountForBlockSizeCheck(Configuration configuration) { | ||
| return configuration.getInt(MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, | ||
| ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); | ||
| } | ||
|
|
||
| public static int getMaxRowCountForBlockSizeCheck(Configuration configuration) { | ||
| return configuration.getInt(MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK, | ||
| ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK); | ||
| } | ||
|
|
||
| public static boolean getEstimateBlockSizeCheck(Configuration configuration) { | ||
| return configuration.getBoolean(ESTIMATE_BLOCK_SIZE_CHECK, | ||
| ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK); | ||
| } | ||
|
|
||
| public static boolean getEstimatePageSizeCheck(Configuration configuration) { | ||
|
|
@@ -362,8 +380,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp | |
| .withDictionaryEncoding(getEnableDictionary(conf)) | ||
| .withWriterVersion(getWriterVersion(conf)) | ||
| .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) | ||
| .estimateRowCountForBlockSizeCheck(getEstimateBlockSizeCheck(conf)) | ||
| .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) | ||
| .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) | ||
| .withMinRowCountForBlockSizeCheck(getMinRowCountForBlockSizeCheck(conf)) | ||
| .withMaxRowCountForBlockSizeCheck(getMaxRowCountForBlockSizeCheck(conf)) | ||
| .build(); | ||
|
|
||
| long blockSize = getLongBlockSize(conf); | ||
|
|
@@ -378,9 +399,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp | |
| LOG.info("Validation is {}", (validating ? "on" : "off")); | ||
| LOG.info("Writer version is: {}", props.getWriterVersion()); | ||
| LOG.info("Maximum row group padding size is {} bytes", maxPaddingSize); | ||
| LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant")); | ||
| LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant")); | ||
| LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); | ||
| LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); | ||
| LOG.info("Min row count for block size check is: {}", props.getMinRowCountForBlockSizeCheck()); | ||
| LOG.info("Max row count for block size check is: {}", props.getMaxRowCountForBlockSizeCheck()); | ||
| } | ||
|
|
||
| WriteContext init = writeSupport.init(conf); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of block, please use "row group". Block is an overused term that we are avoiding in the public API, except for those places where it already appears.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.