Skip to content

Commit

Permalink
HBASE-28748 Replication blocking: InvalidProtocolBufferException: Pro…
Browse files Browse the repository at this point in the history
…tocol message tag had invalid wire type.
  • Loading branch information
Apache9 committed Jul 23, 2024
1 parent 743e8d6 commit 6b2630f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class ProtobufWALTailingReader extends AbstractProtobufWALReader
implements WALTailingReader {

private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALTailingReader.class);

private DelegatingInputStream delegatingInput;

Expand Down Expand Up @@ -117,8 +117,7 @@ private ReadWALKeyResult readWALKey(long originalPosition) {
return KEY_ERROR_AND_RESET;
}
if (available > 0 && available < size) {
LOG.info(
"Available stream not enough for edit, available={}, " + "entry size={} at offset={}",
LOG.info("Available stream not enough for edit, available={}, entry size={} at offset={}",
available, size, getPositionQuietly());
return KEY_EOF_AND_RESET;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ private void setCurrentPath(Path path) {
this.currentPath = path;
}

private void resetReader() throws IOException {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
justification = "HDFS-4380")
private HasNext prepareReader() {
Expand All @@ -213,12 +222,7 @@ private HasNext prepareReader() {
LOG.debug("Reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
if (currentPositionOfEntry > 0) {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
} else {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
resetReader();
return HasNext.YES;
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
Expand Down Expand Up @@ -289,7 +293,7 @@ private HasNext lastAttempt() {
LOG.debug("Reset reader {} for the last time to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression());
try {
reader.resetTo(currentPositionOfEntry, state.resetCompression());
resetReader();
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -56,14 +58,14 @@
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand All @@ -76,6 +78,7 @@
import org.mockito.Mockito;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;

public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {

Expand All @@ -93,7 +96,7 @@ public void setUp() throws Exception {
initWAL();
}

private Entry next(WALEntryStream entryStream) {
private WAL.Entry next(WALEntryStream entryStream) {
assertEquals(HasNext.YES, entryStream.hasNext());
return entryStream.next();
}
Expand Down Expand Up @@ -562,7 +565,7 @@ private WALEntryFilter getDummyFilter() {
return new WALEntryFilter() {

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
return entry;
}
};
Expand All @@ -581,7 +584,7 @@ public FailingWALEntryFilter(int numFailuresInFilter) {
}

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
if (countFailures == numFailures) {
return entry;
}
Expand Down Expand Up @@ -839,6 +842,44 @@ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() thr
assertNull(reader.poll(10));
}

// testcase for HBASE-28748
@Test
public void testWALEntryStreamEOFRightAfterHeader() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);

// AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
// Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
// oldWALs directory.
Waiter.waitFor(CONF, 5000,
(Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
// There will 2 logs in the queue.
assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
appendToLogAndSync();

Path archivedEmptyLogFile = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);

// read the wal header
ByteArrayOutputStream bos = new ByteArrayOutputStream();
bos.write(AbstractProtobufWALReader.PB_WAL_MAGIC);
try (FSDataInputStream in = fs.open(archivedEmptyLogFile)) {
in.skipNBytes(AbstractProtobufWALReader.PB_WAL_MAGIC.length);
WALHeader header = WALHeader.parseDelimitedFrom(in);
header.writeDelimitedTo(bos);
}
// truncate the first empty log so we have an incomplete header
try (FSDataOutputStream out = fs.create(archivedEmptyLogFile, true)) {
bos.writeTo(out);
}
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
assertNotNull(next(entryStream));
}
}

private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
private int filteredWALEntryCount = -1;
private int walEntryCount = 0;
Expand All @@ -851,7 +892,7 @@ public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntr
}

@Override
public Entry filter(Entry entry) {
public WAL.Entry filter(WAL.Entry entry) {
filteredWALEntryCount++;
if (filteredWALEntryCount < walEntryCount - 1) {
return entry;
Expand Down

0 comments on commit 6b2630f

Please sign in to comment.