Skip to content

Commit

Permalink
Optimize FilterStreamInput for Network Reads (elastic#52395)
Browse files Browse the repository at this point in the history
When `FilterStreamInput` wraps a Netty `ByteBuf` based stream it
did not forward the bulk primitive reads to the delegate.
These are optimized on the delegate but if they're not forwarded
then the delegate will be called e.g. 4 times to read an `int`.
This happens for essentially all network reads prior to this
change because they all run from a `NamedWritableAwareStreamInput`.

This also required optimising `BufferedChecksumStreamInput` individually to use bulk reads from the buffer because it implicitly assumed that the filter stream input  wouldn't override any of the bulk operations.
  • Loading branch information
original-brownbear authored Feb 17, 2020
1 parent 9d6eab2 commit 403d1ff
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
delegate.readBytes(b, offset, len);
}

@Override
public short readShort() throws IOException {
return delegate.readShort();
}

@Override
public int readInt() throws IOException {
return delegate.readInt();
}

@Override
public long readLong() throws IOException {
return delegate.readLong();
}

@Override
public void reset() throws IOException {
delegate.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ public void readBytes(byte[] b, int offset, int len) throws IOException {
digest.update(b, offset, len);
}

private static final ThreadLocal<byte[]> buffer = ThreadLocal.withInitial(() -> new byte[8]);

@Override
public short readShort() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 2);
return (short) (((buf[0] & 0xFF) << 8) | (buf[1] & 0xFF));
}

@Override
public int readInt() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 4);
return ((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF);
}

@Override
public long readLong() throws IOException {
final byte[] buf = buffer.get();
readBytes(buf, 0, 8);
return (((long) (((buf[0] & 0xFF) << 24) | ((buf[1] & 0xFF) << 16) | ((buf[2] & 0xFF) << 8) | (buf[3] & 0xFF))) << 32)
| ((((buf[4] & 0xFF) << 24) | ((buf[5] & 0xFF) << 16) | ((buf[6] & 0xFF) << 8) | (buf[7] & 0xFF)) & 0xFFFFFFFFL);
}

@Override
public void reset() throws IOException {
delegate.reset();
Expand Down

0 comments on commit 403d1ff

Please sign in to comment.