Skip to content

Commit 128b480

Browse files
langdamaosaintstack
authored andcommitted
HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
Signed-off-by: langdamao <lang--lang--lang@163.com>
1 parent 92bf07a commit 128b480

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,9 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[]
235235
// Map of families to writers and how much has been output on the writer.
236236
private final Map<byte[], WriterLength> writers =
237237
new TreeMap<>(Bytes.BYTES_COMPARATOR);
238-
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
238+
private final Map<byte[], byte[]> previousRows =
239+
new TreeMap<>(Bytes.BYTES_COMPARATOR);
239240
private final long now = EnvironmentEdgeManager.currentTime();
240-
private boolean rollRequested = false;
241241

242242
@Override
243243
public void write(ImmutableBytesWritable row, V cell)
@@ -286,12 +286,9 @@ public void write(ImmutableBytesWritable row, V cell)
286286
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
287287
}
288288

289-
if (wl != null && wl.written + length >= maxsize) {
290-
this.rollRequested = true;
291-
}
292-
293289
// This can only happen once a row is finished though
294-
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
290+
if (wl != null && wl.written + length >= maxsize
291+
&& Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
295292
rollWriters(wl);
296293
}
297294

@@ -348,7 +345,7 @@ public void write(ImmutableBytesWritable row, V cell)
348345
wl.written += length;
349346

350347
// Copy the row so we know when a row transition.
351-
this.previousRow = rowKey;
348+
this.previousRows.put(family, rowKey);
352349
}
353350

354351
private Path getTableRelativePath(byte[] tableNameBytes) {
@@ -368,7 +365,6 @@ private void rollWriters(WriterLength writerLength) throws IOException {
368365
closeWriter(wl);
369366
}
370367
}
371-
this.rollRequested = false;
372368
}
373369

374370
private void closeWriter(WriterLength wl) throws IOException {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,8 @@ public void testWritingPEData() throws Exception {
430430
// Set down this value or we OOME in eclipse.
431431
conf.setInt("mapreduce.task.io.sort.mb", 20);
432432
// Write a few files.
433-
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
433+
long hregionMaxFilesize = 10 * 1024;
434+
conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
434435

435436
Job job = new Job(conf, "testWritingPEData");
436437
setupRandomGeneratorMapper(job, false);
@@ -457,6 +458,26 @@ public void testWritingPEData() throws Exception {
457458
assertTrue(job.waitForCompletion(false));
458459
FileStatus [] files = fs.listStatus(testDir);
459460
assertTrue(files.length > 0);
461+
462+
//check output file num and size.
463+
for (byte[] family : FAMILIES) {
464+
long kvCount= 0;
465+
RemoteIterator<LocatedFileStatus> iterator =
466+
fs.listFiles(testDir.suffix("/" + new String(family)), true);
467+
while (iterator.hasNext()) {
468+
LocatedFileStatus keyFileStatus = iterator.next();
469+
HFile.Reader reader =
470+
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
471+
HFileScanner scanner = reader.getScanner(false, false, false);
472+
473+
kvCount += reader.getEntries();
474+
scanner.seekTo();
475+
long perKVSize = scanner.getCell().getSerializedSize();
476+
assertTrue("Data size of each file should not be too large.",
477+
perKVSize * reader.getEntries() <= hregionMaxFilesize);
478+
}
479+
assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
480+
}
460481
}
461482

462483
/**

0 commit comments

Comments
 (0)