diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 3dc01b9d4ccd..54880775cd39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
@@ -137,6 +138,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
@@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}
+ /**
+ * Check whether this IPBE indicates EOF or not.
+ *
+ * We will check the exception message, if it is likely the one of
+ * InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
+ */
+ public static boolean isEOF(InvalidProtocolBufferException e) {
+ return e.getMessage().contains("input has been truncated");
+ }
+
+ /**
+ * This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
+ * determine whether there are enough bytes in stream, i.e, the available method does not have a
+ * valid return value, we will try to read all the bytes to a byte array first, and then parse the
+ * pb message with {@link Parser#parseFrom(byte[])} instead of call
+ * {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
+ * not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
+ * errors but just leave us a partial PB message.
+ * @return The PB message if we can parse it successfully, otherwise there will always be an
+ * exception thrown, will never return {@code null}.
+ */
+ public static T parseDelimitedFrom(InputStream in, Parser parser)
+ throws IOException {
+ int firstByte = in.read();
+ if (firstByte < 0) {
+ throw new EOFException("EOF while reading message size");
+ }
+ int size = CodedInputStream.readRawVarint32(firstByte, in);
+ int available = in.available();
+ if (available > 0) {
+ if (available < size) {
+ throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+ + size + " bytes, but only " + available + " bytes available");
+ }
+ // this piece of code is copied from GeneratedMessageV3.parseFrom
+ try {
+ return parser.parseFrom(ByteStreams.limit(in, size));
+ } catch (InvalidProtocolBufferException e) {
+ throw e.unwrapIOException();
+ }
+ } else {
+ // this usually means the stream does not have a proper available implementation, let's read
+ // the content to an byte array before parsing.
+ byte[] bytes = new byte[size];
+ ByteStreams.readFully(in, bytes);
+ return parser.parseFrom(bytes);
+ }
+ }
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
index b27d832ee8c0..fc442b8998d9 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -53,6 +54,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -574,4 +576,21 @@ public void testTagEncodeTrueDecodeFalse() {
List decodedTags = PrivateCellUtil.getTags(decodedCell);
assertEquals(0, decodedTags.size());
}
+
+ /**
+ * Used to confirm that we only consider truncatedMessage as EOF
+ */
+ @Test
+ public void testIsEOF() throws Exception {
+ for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
+ if (
+ method.getParameterCount() == 0
+ && method.getReturnType() == InvalidProtocolBufferException.class
+ ) {
+ method.setAccessible(true);
+ InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
+ assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index d562fe705ced..a7ca18278453 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -36,8 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
-import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -358,60 +356,46 @@ protected Compression.Algorithm getValueCompressionAlgorithm() {
@Override
protected boolean readNext(Entry entry) throws IOException {
+ resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
- WALKey.Builder builder = WALKey.newBuilder();
- long size = 0;
boolean resetPosition = false;
- // by default, we should reset the compression when seeking back after reading something
- resetCompression = true;
try {
- long available = -1;
+ WALKey walKey;
try {
- int firstByte = this.inputStream.read();
- if (firstByte == -1) {
- throw new EOFException();
- }
- size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
- // available may be < 0 on local fs for instance. If so, can't depend on it.
- available = this.inputStream.available();
- if (available > 0 && available < size) {
- // if we quit here, we have just read the length, no actual data yet, which means we
- // haven't put anything into the compression dictionary yet, so when seeking back to the
- // last good position, we do not need to reset compression context.
- // This is very useful for saving the extra effort for reconstructing the compression
- // dictionary, where we need to read from the beginning instead of just seek to the
- // position, as DFSInputStream implement the available method, so in most cases we will
- // reach here if there are not enough data.
- resetCompression = false;
- throw new EOFException("Available stream not enough for edit, "
- + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
- + size + " at offset = " + this.inputStream.getPos());
+ walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+ // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
+ resetPosition = true;
+ throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
+ } else {
+ throw e;
}
- ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
- } catch (InvalidProtocolBufferException ipbe) {
- resetPosition = true;
- throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
- + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
- + size + ", currentAvailable=" + available).initCause(ipbe);
+ } catch (EOFException e) {
+ // append more detailed information
+ throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+ + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
}
- if (!builder.isInitialized()) {
- // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
- // If we can get the KV count, we could, theoretically, try to get next record.
- throw new EOFException("Partial PB while reading WAL, "
- + "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
- }
- WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
return true;
}
+ // Starting from here, we will start to read cells, which will change the content in
+ // compression dictionary, so if we fail in the below operations, when resetting, we also need
+ // to clear the compression context, and read from the beginning to reconstruct the
+ // compression dictionary, instead of seeking to the position directly.
+ // This is very useful for saving the extra effort for reconstructing the compression
+ // dictionary, as DFSInputStream implement the available method, so in most cases we will
+ // not reach here if there are not enough data.
+ resetCompression = true;
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
@@ -490,6 +474,54 @@ private IOException extractHiddenEof(Exception ex) {
return null;
}
+ /**
+ * This is used to determine whether we have already reached the WALTrailer. As the size and magic
+ * are at the end of the WAL file, it is possible that these two options are missing while
+ * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
+ * will try to decode it as WALKey and we will fail but the error could vary as it is parsing
+ * WALTrailer actually.
+ * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
+ */
+ private boolean isWALTrailer(long startPosition) throws IOException {
+ // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
+ // magic at the end
+ int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
+ if (fileLength - startPosition >= trailerSize) {
+ // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
+ // We also test for == here because if this is a valid trailer, we can read it while opening
+ // the reader so we should not reach here
+ return false;
+ }
+ inputStream.seek(startPosition);
+ for (int i = 0; i < 4; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
+ // is a partial trailer
+ return true;
+ }
+ if (r != 0) {
+ // the length is not 0, should not be a trailer
+ return false;
+ }
+ }
+ for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
+ // this is a partial trailer
+ return true;
+ }
+ if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
+ // does not match magic, should not be a trailer
+ return false;
+ }
+ }
+ // in fact we should not reach here, as this means the trailer bytes are all matched and
+ // complete, then we should not call this method...
+ return true;
+ }
+
@Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index 8f7704fb3f6a..aac4a721e5fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
+import static org.junit.Assert.assertEquals;
+
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +37,8 @@
import org.junit.Test;
import org.junit.rules.TestName;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+
/**
* WAL tests that can be reused across providers.
*/
@@ -89,6 +93,9 @@ public static void tearDownAfterClass() throws Exception {
*/
@Test
public void testWALTrailer() throws IOException {
+ // make sure that the size for WALTrailer is 0, we need this assumption when reading partial
+ // WALTrailer
+ assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
// read With trailer.
doRead(true);
// read without trailer