Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27668 PB's parseDelimitedFrom can successfully return when ther… #5059

Merged
merged 2 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -137,6 +138,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
Expand Down Expand Up @@ -3700,4 +3702,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}

/**
* Check whether this IPBE indicates EOF or not.
* <p/>
* We will check the exception message, if it is likely the one of
* InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
*/
public static boolean isEOF(InvalidProtocolBufferException e) {
return e.getMessage().contains("input has been truncated");
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
* determine whether there are enough bytes in stream, i.e, the available method does not have a
* valid return value, we will try to read all the bytes to a byte array first, and then parse the
* pb message with {@link Parser#parseFrom(byte[])} instead of call
* {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
* not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
* errors but just leave us a partial PB message.
* @return The PB message if we can parse it successfully, otherwise there will always be an
* exception thrown, will never return {@code null}.
*/
public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
throws IOException {
int firstByte = in.read();
if (firstByte < 0) {
throw new EOFException("EOF while reading message size");
}
int size = CodedInputStream.readRawVarint32(firstByte, in);
int available = in.available();
if (available > 0) {
if (available < size) {
throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+ size + " bytes, but only " + available + " bytes available");
}
// this piece of code is copied from GeneratedMessageV3.parseFrom
try {
return parser.parseFrom(ByteStreams.limit(in, size));
} catch (InvalidProtocolBufferException e) {
throw e.unwrapIOException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also log InvalidProtocolBufferException before throwing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code is just copied from the protobuf code base, I think it is OK to not logging here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, no worries

}
} else {
// this usually means the stream does not have a proper available implementation, let's read
// the content to an byte array before parsing.
byte[] bytes = new byte[size];
ByteStreams.readFully(in, bytes);
return parser.parseFrom(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -574,4 +576,21 @@ public void testTagEncodeTrueDecodeFalse() {
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
assertEquals(0, decodedTags.size());
}

/**
* Used to confirm that we only consider truncatedMessage as EOF
*/
@Test
public void testIsEOF() throws Exception {
for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
if (
method.getParameterCount() == 0
&& method.getReturnType() == InvalidProtocolBufferException.class
) {
method.setAccessible(true);
InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -358,60 +356,46 @@ protected Compression.Algorithm getValueCompressionAlgorithm() {

@Override
protected boolean readNext(Entry entry) throws IOException {
resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
// by default, we should reset the compression when seeking back after reading something
resetCompression = true;
try {
long available = -1;
WALKey walKey;
try {
int firstByte = this.inputStream.read();
if (firstByte == -1) {
throw new EOFException();
}
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
// if we quit here, we have just read the length, no actual data yet, which means we
// haven't put anything into the compression dictionary yet, so when seeking back to the
// last good position, we do not need to reset compression context.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, where we need to read from the beginning instead of just seek to the
// position, as DFSInputStream implement the available method, so in most cases we will
// reach here if there are not enough data.
resetCompression = false;
throw new EOFException("Available stream not enough for edit, "
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
+ size + " at offset = " + this.inputStream.getPos());
walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
} catch (InvalidProtocolBufferException e) {
if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
// only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
} else {
throw e;
}
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
} catch (InvalidProtocolBufferException ipbe) {
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
+ size + ", currentAvailable=" + available).initCause(ipbe);
} catch (EOFException e) {
// append more detailed information
throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
}
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException("Partial PB while reading WAL, "
+ "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
return true;
}
// Starting from here, we will start to read cells, which will change the content in
// compression dictionary, so if we fail in the below operations, when resetting, we also need
// to clear the compression context, and read from the beginning to reconstruct the
// compression dictionary, instead of seeking to the position directly.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, as DFSInputStream implement the available method, so in most cases we will
// not reach here if there are not enough data.
resetCompression = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. So this was introduced by HBASE-27621 and looks like has not made it to live release yet, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reversed the condition judgement here, no actual logic change. In HBASE-27621, it defaults to true and we set it to false when parsing WALKey fails. And when implementing HBASE-27632, I found a better way is to set it default to false, and when we begin to parse WALEdit, i.e, Cells, we set it to true.

int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
Expand Down Expand Up @@ -490,6 +474,54 @@ private IOException extractHiddenEof(Exception ex) {
return null;
}

/**
* This is used to determine whether we have already reached the WALTrailer. As the size and magic
* are at the end of the WAL file, it is possible that these two options are missing while
* writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
* will try to decode it as WALKey and we will fail but the error could vary as it is parsing
* WALTrailer actually.
* @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
*/
private boolean isWALTrailer(long startPosition) throws IOException {
// We have nothing in the WALTrailer PB message now so its size is just a int length size and a
// magic at the end
int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
if (fileLength - startPosition >= trailerSize) {
// We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
// We also test for == here because if this is a valid trailer, we can read it while opening
// the reader so we should not reach here
return false;
}
inputStream.seek(startPosition);
for (int i = 0; i < 4; i++) {
int r = inputStream.read();
if (r == -1) {
// we have reached EOF while reading the length, and all bytes read are 0, so we assume this
// is a partial trailer
return true;
}
if (r != 0) {
// the length is not 0, should not be a trailer
return false;
}
}
for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
int r = inputStream.read();
if (r == -1) {
// we have reached EOF while reading the magic, and all bytes read are matched, so we assume
// this is a partial trailer
return true;
}
if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
// does not match magic, should not be a trailer
return false;
}
}
// in fact we should not reach here, as this means the trailer bytes are all matched and
// complete, then we should not call this method...
return true;
}

@Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -35,6 +37,8 @@
import org.junit.Test;
import org.junit.rules.TestName;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

/**
* WAL tests that can be reused across providers.
*/
Expand Down Expand Up @@ -89,6 +93,9 @@ public static void tearDownAfterClass() throws Exception {
*/
@Test
public void testWALTrailer() throws IOException {
// make sure that the size for WALTrailer is 0, we need this assumption when reading partial
// WALTrailer
assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
// read With trailer.
doRead(true);
// read without trailer
Expand Down