Skip to content

Commit 4d5dd4e

Browse files
committed
PARQUET-869: Use "row group" instead of "block".
1 parent 943dcb5 commit 4d5dd4e

File tree

3 files changed

+62
-65
lines changed

3 files changed

+62
-65
lines changed

parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ public class ParquetProperties {
4545
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
4646
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
4747
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
48-
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = true;
48+
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = true;
4949
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 100;
5050
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK = 10000;
51-
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 100;
52-
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK = 10000;
51+
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 100;
52+
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK = 10000;
5353

5454
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
5555

@@ -83,15 +83,15 @@ public static WriterVersion fromString(String name) {
8383
private final boolean enableDictionary;
8484
private final int minRowCountForPageSizeCheck;
8585
private final int maxRowCountForPageSizeCheck;
86-
private final int minRowCountForBlockSizeCheck;
87-
private final int maxRowCountForBlockSizeCheck;
86+
private final int minRowCountForRowGroupSizeCheck;
87+
private final int maxRowCountForRowGroupSizeCheck;
8888
private final boolean estimateNextPageSizeCheck;
89-
private final boolean estimateNextBlockSizeCheck;
89+
private final boolean estimateNextRowGroupSizeCheck;
9090
private final ByteBufferAllocator allocator;
9191
private final ValuesWriterFactory valuesWriterFactory;
9292

9393
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
94-
int maxRowCountForPageSizeCheck, int minRowCountForBlockSizeCheck, int maxRowCountForBlockSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextBlockSizeCheck, ByteBufferAllocator allocator,
94+
int maxRowCountForPageSizeCheck, int minRowCountForRowGroupSizeCheck, int maxRowCountForRowGroupSizeCheck, boolean estimateNextPageSizeCheck, boolean estimateNextRowGroupSizeCheck, ByteBufferAllocator allocator,
9595
ValuesWriterFactory writerFactory) {
9696
this.pageSizeThreshold = pageSize;
9797
this.initialSlabSize = CapacityByteArrayOutputStream
@@ -101,10 +101,10 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
101101
this.enableDictionary = enableDict;
102102
this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
103103
this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
104-
this.minRowCountForBlockSizeCheck = minRowCountForBlockSizeCheck;
105-
this.maxRowCountForBlockSizeCheck = maxRowCountForBlockSizeCheck;
104+
this.minRowCountForRowGroupSizeCheck = minRowCountForRowGroupSizeCheck;
105+
this.maxRowCountForRowGroupSizeCheck = maxRowCountForRowGroupSizeCheck;
106106
this.estimateNextPageSizeCheck = estimateNextPageSizeCheck;
107-
this.estimateNextBlockSizeCheck = estimateNextBlockSizeCheck;
107+
this.estimateNextRowGroupSizeCheck = estimateNextRowGroupSizeCheck;
108108
this.allocator = allocator;
109109

110110
this.valuesWriterFactory = writerFactory;
@@ -188,12 +188,12 @@ public int getMaxRowCountForPageSizeCheck() {
188188
return maxRowCountForPageSizeCheck;
189189
}
190190

191-
public int getMinRowCountForBlockSizeCheck() {
192-
return minRowCountForBlockSizeCheck;
191+
public int getMinRowCountForRowGroupSizeCheck() {
192+
return minRowCountForRowGroupSizeCheck;
193193
}
194194

195-
public int getMaxRowCountForBlockSizeCheck() {
196-
return maxRowCountForBlockSizeCheck;
195+
public int getMaxRowCountForRowGroupSizeCheck() {
196+
return maxRowCountForRowGroupSizeCheck;
197197
}
198198

199199
public ValuesWriterFactory getValuesWriterFactory() {
@@ -204,8 +204,8 @@ public boolean estimateNextPageSizeCheck() {
204204
return estimateNextPageSizeCheck;
205205
}
206206

207-
public boolean estimateNextBlockSizeCheck() {
208-
return estimateNextBlockSizeCheck;
207+
public boolean estimateNextRowGroupSizeCheck() {
208+
return estimateNextRowGroupSizeCheck;
209209
}
210210

211211
public static Builder builder() {
@@ -224,9 +224,9 @@ public static class Builder {
224224
private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK;
225225
private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK;
226226
private boolean estimateNextPageSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
227-
private int minRowCountForBlockSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK;
228-
private int maxRowCountForBlockSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK;
229-
private boolean estimateNextBlockSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK;
227+
private int minRowCountForRowGroupSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
228+
private int maxRowCountForRowGroupSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
229+
private boolean estimateNextRowGroupSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK;
230230
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
231231
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
232232

@@ -240,9 +240,9 @@ private Builder(ParquetProperties toCopy) {
240240
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
241241
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
242242
this.estimateNextPageSizeCheck = toCopy.estimateNextPageSizeCheck;
243-
this.minRowCountForBlockSizeCheck = toCopy.minRowCountForBlockSizeCheck;
244-
this.maxRowCountForBlockSizeCheck = toCopy.maxRowCountForBlockSizeCheck;
245-
this.estimateNextBlockSizeCheck = toCopy.estimateNextBlockSizeCheck;
243+
this.minRowCountForRowGroupSizeCheck = toCopy.minRowCountForRowGroupSizeCheck;
244+
this.maxRowCountForRowGroupSizeCheck = toCopy.maxRowCountForRowGroupSizeCheck;
245+
this.estimateNextRowGroupSizeCheck = toCopy.estimateNextRowGroupSizeCheck;
246246
this.allocator = toCopy.allocator;
247247
}
248248

@@ -308,17 +308,17 @@ public Builder withMaxRowCountForPageSizeCheck(int max) {
308308
return this;
309309
}
310310

311-
public Builder withMinRowCountForBlockSizeCheck(int min) {
311+
public Builder withMinRowCountForRowGroupSizeCheck(int min) {
312312
Preconditions.checkArgument(min > 0,
313313
"Invalid row count for block size check (negative): %s", min);
314-
this.minRowCountForBlockSizeCheck = min;
314+
this.minRowCountForRowGroupSizeCheck = min;
315315
return this;
316316
}
317317

318-
public Builder withMaxRowCountForBlockSizeCheck(int max) {
318+
public Builder withMaxRowCountForRowGroupSizeCheck(int max) {
319319
Preconditions.checkArgument(max > 0,
320320
"Invalid row count for block size check (negative): %s", max);
321-
this.maxRowCountForBlockSizeCheck = max;
321+
this.maxRowCountForRowGroupSizeCheck = max;
322322
return this;
323323
}
324324

@@ -328,8 +328,8 @@ public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) {
328328
return this;
329329
}
330330

331-
public Builder estimateRowCountForBlockSizeCheck(boolean estimateBlockSizeCheck) {
332-
this.estimateNextBlockSizeCheck = estimateBlockSizeCheck;
331+
public Builder estimateRowCountForRowGroupSizeCheck(boolean estimateRowGroupSizeCheck) {
332+
this.estimateNextRowGroupSizeCheck = estimateRowGroupSizeCheck;
333333
return this;
334334
}
335335

@@ -349,8 +349,8 @@ public ParquetProperties build() {
349349
ParquetProperties properties =
350350
new ParquetProperties(writerVersion, pageSize, dictPageSize,
351351
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
352-
minRowCountForBlockSizeCheck, maxRowCountForBlockSizeCheck,
353-
estimateNextPageSizeCheck, estimateNextBlockSizeCheck,
352+
minRowCountForRowGroupSizeCheck, maxRowCountForRowGroupSizeCheck,
353+
estimateNextPageSizeCheck, estimateNextRowGroupSizeCheck,
354354
allocator, valuesWriterFactory);
355355
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
356356
// creation of ValuesWriters is invoked from within ParquetProperties. In the future

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class InternalParquetRecordWriter<T> {
4747
private final WriteSupport<T> writeSupport;
4848
private final MessageType schema;
4949
private final Map<String, String> extraMetaData;
50-
private final boolean estimateNextBlockSizeCheck;
50+
private final boolean estimateSizeCheckRows;
5151
private long rowGroupSizeThreshold;
5252
private long nextRowGroupSize;
5353
private final BytesCompressor compressor;
@@ -63,8 +63,8 @@ class InternalParquetRecordWriter<T> {
6363
private ColumnWriteStore columnStore;
6464
private ColumnChunkPageWriteStore pageStore;
6565
private RecordConsumer recordConsumer;
66-
private int minRecordCountForBlockSizeCheck;
67-
private int maxRecordCountForBlockSizeCheck;
66+
private int sizeCheckMinRows;
67+
private int sizeCheckMaxRows;
6868

6969
/**
7070
* @param parquetFileWriter the file to write to
@@ -92,9 +92,9 @@ public InternalParquetRecordWriter(
9292
this.compressor = compressor;
9393
this.validating = validating;
9494
this.props = props;
95-
this.minRecordCountForBlockSizeCheck = props.getMinRowCountForPageSizeCheck();
96-
this.maxRecordCountForBlockSizeCheck = props.getMaxRowCountForPageSizeCheck();
97-
this.estimateNextBlockSizeCheck = props.estimateNextBlockSizeCheck();
95+
this.sizeCheckMinRows = props.getMinRowCountForRowGroupSizeCheck();
96+
this.sizeCheckMaxRows = props.getMaxRowCountForRowGroupSizeCheck();
97+
this.estimateSizeCheckRows = props.estimateNextRowGroupSizeCheck();
9898
initStore();
9999
}
100100

@@ -108,8 +108,6 @@ private void initStore() {
108108
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
109109
this.recordConsumer = columnIO.getRecordWriter(columnStore);
110110
writeSupport.prepareForWrite(recordConsumer);
111-
System.out.println(String.format("Created ParquetWriter with [%d, %d] for block size checks. Estimation(%s). BlockSize(%d)",
112-
minRecordCountForBlockSizeCheck, maxRecordCountForBlockSizeCheck, estimateNextBlockSizeCheck, rowGroupSizeThreshold));
113111
}
114112

115113
public void close() throws IOException, InterruptedException {
@@ -150,22 +148,21 @@ private void checkBlockSizeReached() throws IOException {
150148
LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
151149
flushRowGroupToStore();
152150
initStore();
153-
if (estimateNextBlockSizeCheck) {
154-
recordCountForNextMemCheck = min(max(minRecordCountForBlockSizeCheck, recordCount / 2),
155-
maxRecordCountForBlockSizeCheck);
151+
if (estimateSizeCheckRows) {
152+
recordCountForNextMemCheck = min(max(sizeCheckMinRows, recordCount / 2), sizeCheckMaxRows);
156153
} else {
157-
recordCountForNextMemCheck = minRecordCountForBlockSizeCheck;
154+
recordCountForNextMemCheck = sizeCheckMinRows;
158155
}
159156
this.lastRowGroupEndPos = parquetFileWriter.getPos();
160157
} else {
161-
if (estimateNextBlockSizeCheck) {
158+
if (estimateSizeCheckRows) {
162159
recordCountForNextMemCheck = min(
163-
max(minRecordCountForBlockSizeCheck, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2),
160+
max(sizeCheckMinRows, (recordCount + (long) (nextRowGroupSize / ((float) recordSize))) / 2),
164161
// will check halfway
165-
recordCount + maxRecordCountForBlockSizeCheck // will not look more than max records ahead
162+
recordCount + sizeCheckMaxRows // will not look more than max records ahead
166163
);
167164
} else {
168-
recordCountForNextMemCheck += minRecordCountForBlockSizeCheck;
165+
recordCountForNextMemCheck += sizeCheckMinRows;
169166
}
170167
LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck);
171168
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ public static enum JobSummaryLevel {
140140
public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio";
141141
public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
142142
public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding";
143-
public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_min";
144-
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size_row_check_max";
145-
public static final String MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_min";
146-
public static final String MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK = "parquet.block.size_row_check_max";
147-
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size_check_estimate";
148-
public static final String ESTIMATE_BLOCK_SIZE_CHECK = "parquet.block.size_check_estimate";
143+
public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
144+
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
145+
public static final String MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.min";
146+
public static final String MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.row.check.max";
147+
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
148+
public static final String ESTIMATE_ROW_GROUP_SIZE_CHECK = "parquet.row-group.size.check.estimate";
149149

150150
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
151151
String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -254,19 +254,19 @@ public static int getMaxRowCountForPageSizeCheck(Configuration configuration) {
254254
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_PAGE_SIZE_CHECK);
255255
}
256256

257-
public static int getMinRowCountForBlockSizeCheck(Configuration configuration) {
258-
return configuration.getInt(MIN_ROW_COUNT_FOR_BLOCK_SIZE_CHECK,
259-
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK);
257+
public static int getMinRowCountForRowGroupSizeCheck(Configuration configuration) {
258+
return configuration.getInt(MIN_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK,
259+
ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
260260
}
261261

262-
public static int getMaxRowCountForBlockSizeCheck(Configuration configuration) {
263-
return configuration.getInt(MAX_ROW_COUNT_FOR_BLOCK_SIZE_CHECK,
264-
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_BLOCK_SIZE_CHECK);
262+
public static int getMaxRowCountForRowGroupSizeCheck(Configuration configuration) {
263+
return configuration.getInt(MAX_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK,
264+
ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
265265
}
266266

267267
public static boolean getEstimateBlockSizeCheck(Configuration configuration) {
268-
return configuration.getBoolean(ESTIMATE_BLOCK_SIZE_CHECK,
269-
ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_BLOCK_SIZE_CHECK);
268+
return configuration.getBoolean(ESTIMATE_ROW_GROUP_SIZE_CHECK,
269+
ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_ROW_GROUP_SIZE_CHECK);
270270
}
271271

272272
public static boolean getEstimatePageSizeCheck(Configuration configuration) {
@@ -382,11 +382,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
382382
.withDictionaryEncoding(getEnableDictionary(conf))
383383
.withWriterVersion(getWriterVersion(conf))
384384
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
385-
.estimateRowCountForBlockSizeCheck(getEstimateBlockSizeCheck(conf))
385+
.estimateRowCountForRowGroupSizeCheck(getEstimateBlockSizeCheck(conf))
386386
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
387387
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
388-
.withMinRowCountForBlockSizeCheck(getMinRowCountForBlockSizeCheck(conf))
389-
.withMaxRowCountForBlockSizeCheck(getMaxRowCountForBlockSizeCheck(conf))
388+
.withMinRowCountForRowGroupSizeCheck(getMinRowCountForRowGroupSizeCheck(conf))
389+
.withMaxRowCountForRowGroupSizeCheck(getMaxRowCountForRowGroupSizeCheck(conf))
390390
.build();
391391

392392
long blockSize = getLongBlockSize(conf);
@@ -404,8 +404,8 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
404404
LOG.info("Page size checking is: {}", (props.estimateNextPageSizeCheck() ? "estimated" : "constant"));
405405
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
406406
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
407-
LOG.info("Min row count for block size check is: {}", props.getMinRowCountForBlockSizeCheck());
408-
LOG.info("Max row count for block size check is: {}", props.getMaxRowCountForBlockSizeCheck());
407+
LOG.info("Min row count for row group size check is: {}", props.getMinRowCountForRowGroupSizeCheck());
408+
LOG.info("Max row count for row group size check is: {}", props.getMaxRowCountForRowGroupSizeCheck());
409409
}
410410

411411
WriteContext init = writeSupport.init(conf);

0 commit comments

Comments
 (0)