Skip to content
This repository has been archived by the owner on Oct 4, 2024. It is now read-only.

MessageDecoder can grow and shrink according to usage (in powers of two) #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/main/java/software/amazon/eventstream/MessageDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
* messages. Instances of this class are not thread-safe.
*/
public final class MessageDecoder {

/**
* Initial buffer size is 2MB. Will grow as needed to accommodate larger messages.
*/
private static final int INITIAL_BUFFER_SIZE = 2048 * 1024;
private static final int INITIAL_BUFFER_SIZE = 1024 * 1024;
private static final int BUFFER_SHRINK_COUNT = 100;

private final Consumer<Message> messageConsumer;
private List<Message> bufferedOutput;
private ByteBuffer buf;
private Prelude currentPrelude;

// tracks how many times the buffer was bigger than it needed to be
private int countOversized = 0;

/**
* Creates a {@code MessageDecoder} instance that will buffer messages internally as they are decoded. Decoded
* messages can be obtained by calling {@link #getDecodedMessages()}.
Expand Down Expand Up @@ -115,10 +115,17 @@ public MessageDecoder feed(ByteBuffer byteBuffer) {
if (readView.remaining() >= 15) {
currentPrelude = Prelude.decode(readView.duplicate());
if (buf.capacity() < currentPrelude.getTotalLength()) {
// Don't have enough capacity to hold this message, grow the buffer
buf = ByteBuffer.allocate(currentPrelude.getTotalLength());
int new_size = Integer.highestOneBit(currentPrelude.getTotalLength()) << 1;
// System.out.println("increasing buffer from " + buf.capacity() + " to " + new_size + " to accomodate " + currentPrelude.getTotalLength());
// System.out.flush();
buf = ByteBuffer.allocate(new_size);
buf.put(readView);
readView = updateReadView();
countOversized = 0;
} else if (currentPrelude.getTotalLength() < buf.capacity() / 2) {
countOversized++;
} else {
countOversized = 0;
}
}
}
Expand All @@ -134,8 +141,22 @@ public MessageDecoder feed(ByteBuffer byteBuffer) {

// If we have enough data to decode the message do so and reset the buffer for the next message
if (readView.remaining() >= currentPrelude.getTotalLength()) {
if (currentPrelude.getTotalLength() < buf.capacity() / 2) {
countOversized++;
} else {
countOversized = 0;
}
messageConsumer.accept(Message.decode(currentPrelude, readView));
buf.clear();

if (countOversized >= BUFFER_SHRINK_COUNT) {
countOversized = 0;
int new_size = buf.capacity() / 2;
// System.out.println("decreasing buffer capacity from " + buf.capacity() + " to " + new_size);
// System.out.flush();
buf = ByteBuffer.allocate(new_size);
} else {
buf.clear();
}
currentPrelude = null;
}
}
Expand Down