Skip to content

Commit

Permalink
Combined ByteBufferPool (#8171)
Browse files Browse the repository at this point in the history
All `ByteBufferPool` can now be accessed as `RetainableByteBufferPools`.

Users now need to configure only a single buffer pool and there is just the additional retained parameter that needs consideration.
Default buffer pool has been changed to logarithmic, but we may wish to review that before next release.
Default factor size has been increased to 4096.
  • Loading branch information
gregw authored Jul 4, 2022
1 parent 27a89b2 commit 2b817f0
Show file tree
Hide file tree
Showing 36 changed files with 617 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}

this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
this.retainableByteBufferPool = httpClient.getByteBufferPool().asRetainableByteBufferPool();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -708,22 +708,61 @@ protected int networkFill(ByteBuffer input) throws IOException
assertEquals(0, clientBytes.get());
}

protected class TestRetained extends ArrayRetainableByteBufferPool
{
private final ByteBufferPool _pool;

public TestRetained(ByteBufferPool pool, int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
_pool = pool;
}

@Override
protected ByteBuffer allocate(int capacity)
{
return _pool.acquire(capacity, false);
}

@Override
protected ByteBuffer allocateDirect(int capacity)
{
return _pool.acquire(capacity, true);
}

@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
_pool.release(retainedBuffer.getBuffer());
}

@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
}

private class TestByteBufferPool extends ArrayByteBufferPool
{
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new TestRetained(this, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}

@Test
public void testEncryptedInputBufferRepooling() throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
{
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
};
server.addBean(retainableByteBufferPool);

ArrayByteBufferPool byteBufferPool = new TestByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
Expand Down Expand Up @@ -765,9 +804,12 @@ protected int networkFill(ByteBuffer input) throws IOException

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

Pool<RetainableByteBuffer> bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
Pool<RetainableByteBuffer> bucket = ((TestRetained)retainableByteBufferPool).poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());

long count = ssl.isDirectBuffersForDecryption() ? byteBufferPool.getDirectByteBufferCount() : byteBufferPool.getHeapByteBufferCount();
assertEquals(1, count);
}

@Test
Expand All @@ -777,7 +819,7 @@ public void testEncryptedOutputBufferRepooling() throws Exception
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
Expand Down Expand Up @@ -834,6 +876,7 @@ protected boolean networkFlush(ByteBuffer output) throws IOException

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}

Expand All @@ -845,7 +888,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(bool
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
Expand Down Expand Up @@ -916,6 +959,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}

Expand All @@ -927,7 +971,7 @@ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean cl
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
Expand Down Expand Up @@ -998,6 +1042,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = client.getByteBufferPool().asRetainableByteBufferPool();
}

public HttpDestination getHttpDestination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Result generateResponseContent(int request, ByteBuffer content, boolean l
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = acquire(8);
ByteBuffer endRequestBuffer = acquire(16);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfigur
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.networkByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
RetainableByteBufferPool retainableByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.buffers = byteBufferPool.asRetainableByteBufferPool();
this.parser = parser;
parser.init(MessageListener::new);
}
Expand Down
Loading

0 comments on commit 2b817f0

Please sign in to comment.