diff --git a/src/main/java/software/amazon/eventstream/MessageDecoder.java b/src/main/java/software/amazon/eventstream/MessageDecoder.java index a9701f1..e6d1d64 100644 --- a/src/main/java/software/amazon/eventstream/MessageDecoder.java +++ b/src/main/java/software/amazon/eventstream/MessageDecoder.java @@ -25,17 +25,17 @@ * messages. Instances of this class are not thread-safe. */ public final class MessageDecoder { - - /** - * Initial buffer size is 2MB. Will grow as needed to accommodate larger messages. - */ - private static final int INITIAL_BUFFER_SIZE = 2048 * 1024; + private static final int INITIAL_BUFFER_SIZE = 1024 * 1024; + private static final int BUFFER_SHRINK_COUNT = 100; private final Consumer messageConsumer; private List bufferedOutput; private ByteBuffer buf; private Prelude currentPrelude; + // tracks how many times the buffer was bigger than it needed to be + private int countOversized = 0; + /** * Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded * messages can be obtained by calling {@link #getDecodedMessages()}. @@ -115,10 +115,17 @@ public MessageDecoder feed(ByteBuffer byteBuffer) { if (readView.remaining() >= 15) { currentPrelude = Prelude.decode(readView.duplicate()); if (buf.capacity() < currentPrelude.getTotalLength()) { - // Don't have enough capacity to hold this message, grow the buffer - buf = ByteBuffer.allocate(currentPrelude.getTotalLength()); + int new_size = Integer.highestOneBit(currentPrelude.getTotalLength()) << 1; + // System.out.println("increasing buffer from " + buf.capacity() + " to " + new_size + " to accomodate " + currentPrelude.getTotalLength()); + // System.out.flush(); + buf = ByteBuffer.allocate(new_size); buf.put(readView); readView = updateReadView(); + countOversized = 0; + } else if (currentPrelude.getTotalLength() < buf.capacity() / 2) { + countOversized++; + } else { + countOversized = 0; } } } @@ -134,8 +141,22 @@ public MessageDecoder feed(ByteBuffer byteBuffer) { // If we have enough data to decode the message do so and reset the buffer for the next message if (readView.remaining() >= currentPrelude.getTotalLength()) { + if (currentPrelude.getTotalLength() < buf.capacity() / 2) { + countOversized++; + } else { + countOversized = 0; + } messageConsumer.accept(Message.decode(currentPrelude, readView)); - buf.clear(); + + if (countOversized >= BUFFER_SHRINK_COUNT) { + countOversized = 0; + int new_size = buf.capacity() / 2; + // System.out.println("decreasing buffer capacity from " + buf.capacity() + " to " + new_size); + // System.out.flush(); + buf = ByteBuffer.allocate(new_size); + } else { + buf.clear(); + } currentPrelude = null; } }