Skip to content

Commit

Permalink
Fixed issue of ArrayOutOfBoundsExceptions when trying to read 4 byte …
Browse files Browse the repository at this point in the history
…integer for entity length from buffer with less than 4 bytes available.

Signed-off-by: jasperpotts <jasperpotts@users.noreply.github.com>
  • Loading branch information
jasperpotts committed Nov 7, 2024
1 parent 5b49b66 commit 1b456ed
Showing 1 changed file with 114 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
*/
private int entityBytesIndex = 0;

/** States for currentReadState state ,machine */
enum ReadState {
/**
* Start state, when we are looking for first byte that says if data is compressed or not
*/
START,
/**
* State were we are reading length, can be partial length of final point when we have all
* length bytes
*/
READ_LENGTH,
/** State where we are reading the protobuf entity bytes */
READ_ENTITY_BYTES
}

/** State machine as we read bytes from incoming data */
private ReadState currentReadState = ReadState.START;

/** Number of read bytes between 0 and {@code Integer.BYTES} = 4 */
private int numOfPartReadBytes = 0;

/** Byte array to store bytes as we build up to a full 4 byte integer */
private final byte[] partReadLengthBytes = new byte[Integer.BYTES];

/**
* The communication pipeline between server and client
*
Expand Down Expand Up @@ -324,54 +348,92 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
// There is some asynchronous behavior here, but in the worst case, we handle a few more
// bytes before the stream is closed.
while (data.available() > 0) {
// First chunk of data contains the compression flag and the length of the message
if (entityBytes == null) {
// Read whether this message is compressed. We do not currently support
// compression.
final var isCompressed = (data.read() == 1);
if (isCompressed) {
// The error will eventually result in the stream being closed
throw new GrpcException(
GrpcStatus.UNIMPLEMENTED, "Compression is not supported");
}
// Read the length of the message. As per the grpc protocol specification, each
// message on the wire is prefixed with the number of bytes for the message.
// However, to prevent a DOS attack where the attacker sends us a very large
// length and exhausts our memory, we have a maximum message size configuration
// setting. Using that, we can detect attempts to exhaust our memory.
final long length = data.readUnsignedInt32();
if (length > config.maxMessageSizeBytes()) {
throw new GrpcException(
GrpcStatus.INVALID_ARGUMENT,
"Message size exceeds maximum allowed size");
}
// Create a buffer to hold the message. We sadly cannot reuse this buffer
// because once we have filled it and wrapped it in Bytes and sent it to the
// handler, some user code may grab and hold that Bytes object for an arbitrary
// amount of time, and if we were to scribble into the same byte array, we
// would break the application. So we need a new buffer each time :-(
entityBytes = new byte[(int) length];
entityBytesIndex = 0;
}

// By the time we get here, entityBytes is no longer null. It may be empty, or it
// may already have been partially populated from a previous iteration. It may be
// that the number of bytes available to be read is larger than just this one
// message. So we need to be careful to read, from what is available, only up to
// the message length, and to leave the rest for the next iteration.
final int available = data.available();
final int numBytesToRead =
Math.min(entityBytes.length - entityBytesIndex, available);
data.read(entityBytes, entityBytesIndex, numBytesToRead);
entityBytesIndex += numBytesToRead;

// If we have completed reading the message, then we can proceed.
if (entityBytesIndex == entityBytes.length) {
// Grab and wrap the bytes and reset to being reading the next message
final var bytes = Bytes.wrap(entityBytes);
pipeline.onNext(bytes);
entityBytesIndex = 0;
entityBytes = null;
switch (currentReadState) {
case START:
{
// Read whether this message is compressed. We do not currently support
// compression.
final var isCompressed = (data.read() == 1);
if (isCompressed) {
// The error will eventually result in the stream being closed
throw new GrpcException(
GrpcStatus.UNIMPLEMENTED, "Compression is not supported");
}
currentReadState = ReadState.READ_LENGTH;
numOfPartReadBytes = 0;
break;
}
case READ_LENGTH:
{
// if I have not read a full int yet then read more from available bytes
if (numOfPartReadBytes < Integer.BYTES) {
// we do not have enough bytes yet to read a 4 byte int
// read the bytes we do have and store them for next time
final int bytesToRead =
Math.min(
data.available(),
Integer.BYTES - numOfPartReadBytes);
data.read(partReadLengthBytes, numOfPartReadBytes, bytesToRead);
numOfPartReadBytes += bytesToRead;
}
// check if we have read all the 4 bytes of the length int32
if (numOfPartReadBytes == Integer.BYTES) {
final long length =
((long) partReadLengthBytes[0] & 0xFF) << 24
| ((long) partReadLengthBytes[1] & 0xFF) << 16
| ((long) partReadLengthBytes[2] & 0xFF) << 8
| ((long) partReadLengthBytes[3] & 0xFF);
if (length > config.maxMessageSizeBytes()) {
throw new GrpcException(
GrpcStatus.INVALID_ARGUMENT,
"Message size exceeds maximum allowed size");
}
// Create a buffer to hold the message. We sadly cannot reuse this
// buffer
// because once we have filled it and wrapped it in Bytes and sent
// it to the
// handler, some user code may grab and hold that Bytes object for
// an arbitrary
// amount of time, and if we were to scribble into the same byte
// array, we
// would break the application. So we need a new buffer each time
// :-(
entityBytes = new byte[(int) length];
entityBytesIndex = 0;
// done with length now, so move on to next state
currentReadState = ReadState.READ_ENTITY_BYTES;
}
break;
}
case READ_ENTITY_BYTES:
{
// By the time we get here, entityBytes is no longer null. It may be
// empty, or it
// may already have been partially populated from a previous iteration.
// It may be
// that the number of bytes available to be read is larger than just
// this one
// message. So we need to be careful to read, from what is available,
// only up to
// the message length, and to leave the rest for the next iteration.
final int available = data.available();
final int numBytesToRead =
Math.min(entityBytes.length - entityBytesIndex, available);
data.read(entityBytes, entityBytesIndex, numBytesToRead);
entityBytesIndex += numBytesToRead;

// If we have completed reading the message, then we can proceed.
if (entityBytesIndex == entityBytes.length) {
currentReadState = ReadState.START;
// Grab and wrap the bytes and reset to being reading the next
// message
final var bytes = Bytes.wrap(entityBytes);
pipeline.onNext(bytes);
entityBytesIndex = 0;
entityBytes = null;
}
break;
}
}
}

Expand All @@ -393,9 +455,10 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa
}

/**
* An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly.
* <p>
* May be called by different threads concurrently.
* An error has occurred. Cancel the deadline future if it's still active, and set the stream
* state accordingly.
*
* <p>May be called by different threads concurrently.
*/
private void error() {
// Canceling a future that has already completed has no effect. So by canceling here, we are
Expand Down

0 comments on commit 1b456ed

Please sign in to comment.