diff --git a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/BufferedData.java b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/BufferedData.java index e6436305..5b685d24 100644 --- a/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/BufferedData.java +++ b/pbj-core/pbj-runtime/src/main/java/com/hedera/pbj/runtime/io/buffer/BufferedData.java @@ -1,13 +1,11 @@ package com.hedera.pbj.runtime.io.buffer; import com.hedera.pbj.runtime.io.DataAccessException; -import com.hedera.pbj.runtime.io.DataEncodingException; import com.hedera.pbj.runtime.io.ReadableSequentialData; import com.hedera.pbj.runtime.io.WritableSequentialData; import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -29,23 +27,29 @@ public class BufferedData implements BufferedSequentialData, ReadableSequentialD @SuppressWarnings("unused") public static final BufferedData EMPTY_BUFFER = wrap(ByteBuffer.allocate(0)); - /** {@link ByteBuffer} used as backing buffer for this instance */ + /** + * {@link ByteBuffer} used as backing buffer for this instance. + * + *
The buffer may be direct, or may be on the heap. It may also be a "view" of another buffer. The ByteBuffer has + * an inner array, which can be accessed directly. If it is, you MUST BE VERY CAREFUL to take the array offset into + * account, otherwise you will read out of bounds of the view. + */ private final ByteBuffer buffer; + /** Locally cached value of {@link ByteBuffer#isDirect()}. This value is queried by the varInt methods. */ private final boolean direct; /** * Wrap an existing allocated {@link ByteBuffer}. No copy is made. * * @param buffer the {@link ByteBuffer} to wrap - * @param direct If this is a dirrect buffer allocation. */ private BufferedData(@NonNull final ByteBuffer buffer) { this.buffer = buffer; this.direct = buffer.isDirect(); - // I switch the buffer to BIG_ENDIAN so that all our normal "get/read" methods can assume they are in + // We switch the buffer to BIG_ENDIAN so that all our normal "get/read" methods can assume they are in // BIG_ENDIAN mode, reducing the boilerplate around those methods. This necessarily means the LITTLE_ENDIAN - // methods will be slower. I'm assuming BIG_ENDIAN is what we want to optimize for. + // methods will be slower. We're assuming BIG_ENDIAN is what we want to optimize for. this.buffer.order(BIG_ENDIAN); } @@ -53,7 +57,8 @@ private BufferedData(@NonNull final ByteBuffer buffer) { // Static Builder Methods /** - * Wrap an existing allocated {@link ByteBuffer}. No copy is made. + * Wrap an existing allocated {@link ByteBuffer}. No copy is made. DO NOT modify this buffer after having wrapped + * it. * * @param buffer the {@link ByteBuffer} to wrap * @return new instance using {@code buffer} as its data buffer @@ -65,7 +70,7 @@ public static BufferedData wrap(@NonNull final ByteBuffer buffer) { /** * Wrap an existing allocated byte[]. No copy is made. The length of the {@link BufferedData} will be - * the *ENTIRE* length of the byte array. + * the *ENTIRE* length of the byte array. DO NOT modify this array after having wrapped it. * * @param array the byte[] to wrap * @return new DataBuffer using {@code array} as its data buffer @@ -76,7 +81,7 @@ public static BufferedData wrap(@NonNull final byte[] array) { } /** - * Wrap an existing allocated byte[]. No copy is made. + * Wrap an existing allocated byte[]. No copy is made. DO NOT modify this array after having wrapped it. * * @param array the byte[] to wrap * @param offset the offset into the byte array which will form the origin of this {@link BufferedData}. @@ -349,7 +354,7 @@ public long getBytes(final long offset, @NonNull final byte[] dst, final int dst @Override public long getBytes(final long offset, @NonNull final ByteBuffer dst) { final var len = Math.min(dst.remaining(), length() - offset); - buffer.get(Math.toIntExact(offset), dst.array(), dst.position(), Math.toIntExact(len)); + dst.put(dst.position(), buffer, Math.toIntExact(offset), Math.toIntExact(len)); return len; } @@ -357,14 +362,21 @@ public long getBytes(final long offset, @NonNull final ByteBuffer dst) { @Override public long getBytes(final long offset, @NonNull final BufferedData dst) { final var len = Math.min(dst.remaining(), length() - offset); - buffer.get(Math.toIntExact(offset), dst.buffer.array(), Math.toIntExact(dst.position()), Math.toIntExact(len)); + dst.buffer.put(dst.buffer.position(), buffer, Math.toIntExact(offset), Math.toIntExact(len)); return len; } @NonNull @Override public Bytes getBytes(long offset, long length) { - return Bytes.wrap(buffer.array(), buffer.arrayOffset() + Math.toIntExact(offset), Math.toIntExact(length)); + final var len = Math.toIntExact(length); + if (direct) { + final var copy = new byte[len]; + buffer.get(Math.toIntExact(offset), copy, 0, len); + return Bytes.wrap(copy); + } else { + return Bytes.wrap(buffer.array(), Math.toIntExact(buffer.arrayOffset() + offset), len); + } } /** {@inheritDoc} */ @@ -528,7 +540,7 @@ public Bytes readBytes(final int length) { if (length == 0) return Bytes.EMPTY; if (remaining() < length) throw new BufferUnderflowException(); - final var bytes = Bytes.wrap(buffer.array(),buffer.arrayOffset() + buffer.position(), length); + final var bytes = getBytes(position(), length); buffer.position(buffer.position() + length); return bytes; } @@ -536,7 +548,7 @@ public Bytes readBytes(final int length) { /** {@inheritDoc} */ @NonNull @Override - public ReadableSequentialData view(final int length) { + public BufferedData view(final int length) { if (length < 0) { throw new IllegalArgumentException("Length cannot be negative"); } @@ -652,18 +664,19 @@ public double readDouble(@NonNull final ByteOrder byteOrder) { public int readVarInt(final boolean zigZag) { if (direct) return ReadableSequentialData.super.readVarInt(zigZag); - int tempPos = buffer.position(); + int tempPos = buffer.position() + buffer.arrayOffset(); final byte[] buff = buffer.array(); - int len = buffer.limit(); - if (len == tempPos) { - return (int)readVarIntLongSlow(zigZag); + int lastPos = buffer.limit() + buffer.arrayOffset(); + + if (lastPos == tempPos) { + throw new BufferUnderflowException(); } int x; if ((x = buff[tempPos++]) >= 0) { buffer.position(tempPos); return zigZag ? (x >>> 1) ^ -(x & 1) : x; - } else if (len - tempPos < 9) { + } else if (lastPos - tempPos < 9) { return (int)readVarIntLongSlow(zigZag); } else if ((x ^= (buff[tempPos++] << 7)) < 0) { x ^= (~0 << 7); @@ -684,7 +697,7 @@ public int readVarInt(final boolean zigZag) { return (int)readVarIntLongSlow(zigZag); } } - buffer.position(tempPos); + buffer.position(tempPos - buffer.arrayOffset()); return zigZag ? (x >>> 1) ^ -(x & 1) : x; } @@ -695,11 +708,12 @@ public int readVarInt(final boolean zigZag) { public long readVarLong(boolean zigZag) { if (direct) return ReadableSequentialData.super.readVarLong(zigZag); - int tempPos = (int)position(); - byte[] buff = buffer.array(); - int len = buffer.limit(); - if (len == tempPos) { - return readVarIntLongSlow(zigZag); + int tempPos = buffer.position() + buffer.arrayOffset(); + final byte[] buff = buffer.array(); + int lastPos = buffer.limit() + buffer.arrayOffset(); + + if (lastPos == tempPos) { + throw new BufferUnderflowException(); } long x; @@ -707,7 +721,7 @@ public long readVarLong(boolean zigZag) { if ((y = buff[tempPos++]) >= 0) { buffer.position(tempPos); return zigZag ? (y >>> 1) ^ -(y & 1) : y; - } else if (len - tempPos < 9) { + } else if (lastPos - tempPos < 9) { return readVarIntLongSlow(zigZag); } else if ((y ^= (buff[tempPos++] << 7)) < 0) { x = y ^ (~0 << 7); @@ -860,7 +874,7 @@ public int writeBytes(@NonNull final InputStream src, final int maxLength) { try { int totalBytesRead = 0; while (totalBytesRead < numBytesToRead) { - int numBytesRead = src.read(array, buffer.position(), (int) numBytesToRead - totalBytesRead); + int numBytesRead = src.read(array, buffer.position() + buffer.arrayOffset(), (int) numBytesToRead - totalBytesRead); if (numBytesRead == -1) { return totalBytesRead; } diff --git a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/ReadableTestBase.java b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/ReadableTestBase.java index 4e788241..46ee1c91 100644 --- a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/ReadableTestBase.java +++ b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/ReadableTestBase.java @@ -1210,7 +1210,7 @@ void readPastLimit() { @DisplayName("Reading a varint when less than 4 bytes are available throws BufferUnderflowException") void readInsufficientDataThrows(final boolean zigZag) { final var seq = sequence(new byte[] { (byte) 0b10101100 }); - assertThatThrownBy(() -> seq.readVarLong(zigZag)).isInstanceOf(BufferUnderflowException.class); + assertThatThrownBy(() -> seq.readVarInt(zigZag)).isInstanceOf(BufferUnderflowException.class); } @Test diff --git a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/buffer/BufferedDataTest.java b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/buffer/BufferedDataTest.java index 522abd90..1a638c2d 100644 --- a/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/buffer/BufferedDataTest.java +++ b/pbj-core/pbj-runtime/src/test/java/com/hedera/pbj/runtime/io/buffer/BufferedDataTest.java @@ -11,8 +11,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; final class BufferedDataTest { - // TODO Test that "view" shows the updated data when it is changed on the fly - // TODO Verify capacity is never negative (it can't be, maybe nothing to test here) + // FUTURE Test that "view" shows the updated data when it is changed on the fly + // FUTURE Verify capacity is never negative (it can't be, maybe nothing to test here) @Test @DisplayName("toString() is safe") @@ -98,4 +98,111 @@ protected byte[] extractWrittenBytes(@NonNull WritableSequentialData seq) { return bytes; } } + + @Nested + final class ReadableSequentialDataViewTest extends ReadableTestBase { + + @NonNull + @Override + protected ReadableSequentialData emptySequence() { + final var buf = BufferedData.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }); + buf.position(7); + return buf.view(0); + } + + @NonNull + @Override + protected ReadableSequentialData fullyUsedSequence() { + final var buf = BufferedData.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }); + buf.position(2); + final var view = buf.view(5); + view.skip(5); + return view; + } + + @NonNull + @Override + protected ReadableSequentialData sequence(@NonNull byte[] arr) { + BufferedData buf = BufferedData.allocate(arr.length + 20); + for (int i = 0; i < 10; i++) { + buf.writeByte((byte) i); + } + buf.position(buf.length() - 10); + for (int i = 0; i < 10; i++) { + buf.writeByte((byte) i); + } + buf.position(10); + buf.writeBytes(arr); + buf.position(10); + return buf.view(arr.length); + } + } + + @Nested + final class RandomAccessDataViewTest extends ReadableTestBase { + + @NonNull + @Override + protected ReadableSequentialData emptySequence() { + final var buf = BufferedData.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }); + buf.position(7); + return new RandomAccessSequenceAdapter(buf.view(0)); + } + + @NonNull + @Override + protected ReadableSequentialData fullyUsedSequence() { + final var buf = BufferedData.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }); + buf.position(2); + final var view = buf.view(5); + view.skip(5); + return new RandomAccessSequenceAdapter(view, 5); + } + + @NonNull + @Override + protected ReadableSequentialData sequence(@NonNull byte[] arr) { + BufferedData buf = BufferedData.allocate(arr.length + 20); + for (int i = 0; i < 10; i++) { + buf.writeByte((byte) i); + } + buf.position(buf.length() - 10); + for (int i = 0; i < 10; i++) { + buf.writeByte((byte) i); + } + buf.position(10); + buf.writeBytes(arr); + buf.position(10); + return new RandomAccessSequenceAdapter(buf.view(arr.length)); + } + } + + @Nested + final class WritableSequentialDataViewTest extends WritableTestBase { + @NonNull + @Override + protected WritableSequentialData sequence() { + final var mb = 1024 * 1024; + final var buf = BufferedData.allocate(mb * 4); // the largest expected test value is 1 mb + buf.position(mb); + return buf.view(mb * 2); + } + + @NonNull + @Override + protected WritableSequentialData eofSequence() { + final var buf = BufferedData.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }); + buf.position(7); + return buf.view(0); + } + + @NonNull + @Override + protected byte[] extractWrittenBytes(@NonNull WritableSequentialData seq) { + final var buf = (BufferedData) seq; + final var bytes = new byte[Math.toIntExact(buf.position())]; + buf.getBytes(0, bytes); + return bytes; + } + } }