Skip to content

Commit f40c745

Browse files
langdamaosaintstack
authored andcommitted
HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
Signed-off-by: langdamao <lang--lang--lang@163.com>
1 parent fff0f33 commit f40c745

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,9 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[]
247247
// Map of families to writers and how much has been output on the writer.
248248
private final Map<byte[], WriterLength> writers =
249249
new TreeMap<>(Bytes.BYTES_COMPARATOR);
250-
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
250+
private final Map<byte[], byte[]> previousRows =
251+
new TreeMap<>(Bytes.BYTES_COMPARATOR);
251252
private final long now = EnvironmentEdgeManager.currentTime();
252-
private boolean rollRequested = false;
253253

254254
@Override
255255
public void write(ImmutableBytesWritable row, V cell)
@@ -291,12 +291,9 @@ public void write(ImmutableBytesWritable row, V cell)
291291
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
292292
}
293293

294-
if (wl != null && wl.written + length >= maxsize) {
295-
this.rollRequested = true;
296-
}
297-
298294
// This can only happen once a row is finished though
299-
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
295+
if (wl != null && wl.written + length >= maxsize
296+
&& Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
300297
rollWriters(wl);
301298
}
302299

@@ -354,7 +351,7 @@ public void write(ImmutableBytesWritable row, V cell)
354351
wl.written += length;
355352

356353
// Copy the row so we know when a row transition.
357-
this.previousRow = rowKey;
354+
this.previousRows.put(family, rowKey);
358355
}
359356

360357
private void rollWriters(WriterLength writerLength) throws IOException {
@@ -365,7 +362,6 @@ private void rollWriters(WriterLength writerLength) throws IOException {
365362
closeWriter(wl);
366363
}
367364
}
368-
this.rollRequested = false;
369365
}
370366

371367
private void closeWriter(WriterLength wl) throws IOException {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ public void testWritingPEData() throws Exception {
424424
// Set down this value or we OOME in eclipse.
425425
conf.setInt("mapreduce.task.io.sort.mb", 20);
426426
// Write a few files.
427-
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
427+
long hregionMaxFilesize = 10 * 1024;
428+
conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
428429

429430
Job job = new Job(conf, "testWritingPEData");
430431
setupRandomGeneratorMapper(job, false);
@@ -451,6 +452,26 @@ public void testWritingPEData() throws Exception {
451452
assertTrue(job.waitForCompletion(false));
452453
FileStatus [] files = fs.listStatus(testDir);
453454
assertTrue(files.length > 0);
455+
456+
//check output file num and size.
457+
for (byte[] family : FAMILIES) {
458+
long kvCount= 0;
459+
RemoteIterator<LocatedFileStatus> iterator =
460+
fs.listFiles(testDir.suffix("/" + new String(family)), true);
461+
while (iterator.hasNext()) {
462+
LocatedFileStatus keyFileStatus = iterator.next();
463+
HFile.Reader reader =
464+
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
465+
HFileScanner scanner = reader.getScanner(false, false, false);
466+
467+
kvCount += reader.getEntries();
468+
scanner.seekTo();
469+
long perKVSize = scanner.getCell().getSerializedSize();
470+
assertTrue("Data size of each file should not be too large.",
471+
perKVSize * reader.getEntries() <= hregionMaxFilesize);
472+
}
473+
assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
474+
}
454475
}
455476

456477
/**

0 commit comments

Comments
 (0)