diff --git a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java index 837fb1c3..28f8b8a4 100644 --- a/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java +++ b/pbj-core/pbj-grpc-helidon/src/main/java/com/hedera/pbj/grpc/helidon/PbjProtocolHandler.java @@ -137,6 +137,30 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa */ private int entityBytesIndex = 0; + /** States for currentReadState state ,machine */ + enum ReadState { + /** + * Start state, when we are looking for first byte that says if data is compressed or not + */ + START, + /** + * State were we are reading length, can be partial length of final point when we have all + * length bytes + */ + READ_LENGTH, + /** State where we are reading the protobuf entity bytes */ + READ_ENTITY_BYTES + } + + /** State machine as we read bytes from incoming data */ + private ReadState currentReadState = ReadState.START; + + /** Number of read bytes between 0 and {@code Integer.BYTES} = 4 */ + private int numOfPartReadBytes = 0; + + /** Byte array to store bytes as we build up to a full 4 byte integer */ + private final byte[] partReadLengthBytes = new byte[Integer.BYTES]; + /** * The communication pipeline between server and client * @@ -324,54 +348,92 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa // There is some asynchronous behavior here, but in the worst case, we handle a few more // bytes before the stream is closed. while (data.available() > 0) { - // First chunk of data contains the compression flag and the length of the message - if (entityBytes == null) { - // Read whether this message is compressed. We do not currently support - // compression. - final var isCompressed = (data.read() == 1); - if (isCompressed) { - // The error will eventually result in the stream being closed - throw new GrpcException( - GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); - } - // Read the length of the message. As per the grpc protocol specification, each - // message on the wire is prefixed with the number of bytes for the message. - // However, to prevent a DOS attack where the attacker sends us a very large - // length and exhausts our memory, we have a maximum message size configuration - // setting. Using that, we can detect attempts to exhaust our memory. - final long length = data.readUnsignedInt32(); - if (length > config.maxMessageSizeBytes()) { - throw new GrpcException( - GrpcStatus.INVALID_ARGUMENT, - "Message size exceeds maximum allowed size"); - } - // Create a buffer to hold the message. We sadly cannot reuse this buffer - // because once we have filled it and wrapped it in Bytes and sent it to the - // handler, some user code may grab and hold that Bytes object for an arbitrary - // amount of time, and if we were to scribble into the same byte array, we - // would break the application. So we need a new buffer each time :-( - entityBytes = new byte[(int) length]; - entityBytesIndex = 0; - } - - // By the time we get here, entityBytes is no longer null. It may be empty, or it - // may already have been partially populated from a previous iteration. It may be - // that the number of bytes available to be read is larger than just this one - // message. So we need to be careful to read, from what is available, only up to - // the message length, and to leave the rest for the next iteration. - final int available = data.available(); - final int numBytesToRead = - Math.min(entityBytes.length - entityBytesIndex, available); - data.read(entityBytes, entityBytesIndex, numBytesToRead); - entityBytesIndex += numBytesToRead; - - // If we have completed reading the message, then we can proceed. - if (entityBytesIndex == entityBytes.length) { - // Grab and wrap the bytes and reset to being reading the next message - final var bytes = Bytes.wrap(entityBytes); - pipeline.onNext(bytes); - entityBytesIndex = 0; - entityBytes = null; + switch (currentReadState) { + case START: + { + // Read whether this message is compressed. We do not currently support + // compression. + final var isCompressed = (data.read() == 1); + if (isCompressed) { + // The error will eventually result in the stream being closed + throw new GrpcException( + GrpcStatus.UNIMPLEMENTED, "Compression is not supported"); + } + currentReadState = ReadState.READ_LENGTH; + numOfPartReadBytes = 0; + break; + } + case READ_LENGTH: + { + // if I have not read a full int yet then read more from available bytes + if (numOfPartReadBytes < Integer.BYTES) { + // we do not have enough bytes yet to read a 4 byte int + // read the bytes we do have and store them for next time + final int bytesToRead = + Math.min( + data.available(), + Integer.BYTES - numOfPartReadBytes); + data.read(partReadLengthBytes, numOfPartReadBytes, bytesToRead); + numOfPartReadBytes += bytesToRead; + } + // check if we have read all the 4 bytes of the length int32 + if (numOfPartReadBytes == Integer.BYTES) { + final long length = + ((long) partReadLengthBytes[0] & 0xFF) << 24 + | ((long) partReadLengthBytes[1] & 0xFF) << 16 + | ((long) partReadLengthBytes[2] & 0xFF) << 8 + | ((long) partReadLengthBytes[3] & 0xFF); + if (length > config.maxMessageSizeBytes()) { + throw new GrpcException( + GrpcStatus.INVALID_ARGUMENT, + "Message size exceeds maximum allowed size"); + } + // Create a buffer to hold the message. We sadly cannot reuse this + // buffer + // because once we have filled it and wrapped it in Bytes and sent + // it to the + // handler, some user code may grab and hold that Bytes object for + // an arbitrary + // amount of time, and if we were to scribble into the same byte + // array, we + // would break the application. So we need a new buffer each time + // :-( + entityBytes = new byte[(int) length]; + entityBytesIndex = 0; + // done with length now, so move on to next state + currentReadState = ReadState.READ_ENTITY_BYTES; + } + break; + } + case READ_ENTITY_BYTES: + { + // By the time we get here, entityBytes is no longer null. It may be + // empty, or it + // may already have been partially populated from a previous iteration. + // It may be + // that the number of bytes available to be read is larger than just + // this one + // message. So we need to be careful to read, from what is available, + // only up to + // the message length, and to leave the rest for the next iteration. + final int available = data.available(); + final int numBytesToRead = + Math.min(entityBytes.length - entityBytesIndex, available); + data.read(entityBytes, entityBytesIndex, numBytesToRead); + entityBytesIndex += numBytesToRead; + + // If we have completed reading the message, then we can proceed. + if (entityBytesIndex == entityBytes.length) { + currentReadState = ReadState.START; + // Grab and wrap the bytes and reset to being reading the next + // message + final var bytes = Bytes.wrap(entityBytes); + pipeline.onNext(bytes); + entityBytesIndex = 0; + entityBytes = null; + } + break; + } } } @@ -393,9 +455,10 @@ public void data(@NonNull final Http2FrameHeader header, @NonNull final BufferDa } /** - * An error has occurred. Cancel the deadline future if it's still active, and set the stream state accordingly. - *

- * May be called by different threads concurrently. + * An error has occurred. Cancel the deadline future if it's still active, and set the stream + * state accordingly. + * + *

May be called by different threads concurrently. */ private void error() { // Canceling a future that has already completed has no effect. So by canceling here, we are