Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined ByteBufferPool #8171

Merged
merged 27 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4ad2c75
Combined ByteBufferPool
gregw Jun 16, 2022
6d7dfa1
Combined ByteBufferPool
gregw Jun 16, 2022
72beab1
Combined ByteBufferPool
gregw Jun 16, 2022
a66f6f2
Combined ByteBufferPool
gregw Jun 16, 2022
60871f8
Combined ByteBufferPool
gregw Jun 16, 2022
ca45270
Combined ByteBufferPool
gregw Jun 20, 2022
1262583
Combined ByteBufferPool
gregw Jun 20, 2022
5a84b81
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
gregw Jun 21, 2022
f1ffc83
Combined ByteBufferPool
gregw Jun 21, 2022
a04bdc2
Merge branch 'jetty-10.0.x' into jetty-10.0.x-one-buffer-pool-to-rule…
gregw Jun 27, 2022
417baf2
cleanup defaults
gregw Jun 28, 2022
96bcd07
Merge remote-tracking branch 'origin/jetty-10.0.x' into jetty-10.0.x-…
gregw Jun 28, 2022
9d38872
Combined ByteBufferPool
gregw Jun 28, 2022
58461e0
Combined ByteBufferPool
gregw Jun 28, 2022
7cdd0b7
Combined ByteBufferPool
gregw Jun 28, 2022
cb8b422
Combined ByteBufferPool
gregw Jun 28, 2022
1877f39
Combined ByteBufferPool
gregw Jun 28, 2022
0dbf44f
Combined ByteBufferPool
gregw Jun 29, 2022
c5c0ca9
Combined ByteBufferPool
gregw Jun 29, 2022
5c4abfa
Combined ByteBufferPool
gregw Jun 29, 2022
b61d133
Combined ByteBufferPool
gregw Jun 29, 2022
9e1cf32
Combined ByteBufferPool
gregw Jun 29, 2022
78edb9c
Combined ByteBufferPool
gregw Jun 29, 2022
80ac547
Combined ByteBufferPool
gregw Jun 29, 2022
c2d261e
Combined ByteBufferPool
gregw Jun 30, 2022
02bc66d
Combined ByteBufferPool
gregw Jun 30, 2022
0cfc50c
Combined ByteBufferPool
gregw Jul 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}

this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
this.retainableByteBufferPool = httpClient.getByteBufferPool().asRetainableByteBufferPool();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,22 +708,65 @@ protected int networkFill(ByteBuffer input) throws IOException
assertEquals(0, clientBytes.get());
}

protected class TestRetained extends ArrayRetainableByteBufferPool
{
private final ByteBufferPool _pool;

public TestRetained(ByteBufferPool pool, int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
_pool = pool;
}

@Override
protected ByteBuffer allocate(int capacity)
{
System.err.println("allocate " + capacity);
new Throwable().printStackTrace();
gregw marked this conversation as resolved.
Show resolved Hide resolved
return _pool.acquire(capacity, false);
}

@Override
protected ByteBuffer allocateDirect(int capacity)
{
System.err.println("allocateDirect " + capacity);
new Throwable().printStackTrace();
gregw marked this conversation as resolved.
Show resolved Hide resolved
return _pool.acquire(capacity, true);
}

@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
_pool.release(retainedBuffer.getBuffer());
}

@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
}

private class TestByteBufferPool extends ArrayByteBufferPool
{
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new TestRetained(this, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}

@Test
public void testEncryptedInputBufferRepooling() throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
{
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
};
server.addBean(retainableByteBufferPool);

ArrayByteBufferPool byteBufferPool = new TestByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
Expand Down Expand Up @@ -765,9 +808,12 @@ protected int networkFill(ByteBuffer input) throws IOException

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

Pool<RetainableByteBuffer> bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
Pool<RetainableByteBuffer> bucket = ((TestRetained)retainableByteBufferPool).poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());

long count = ssl.isDirectBuffersForDecryption() ? byteBufferPool.getDirectByteBufferCount() : byteBufferPool.getHeapByteBufferCount();
assertEquals(1, count);
}

@Test
Expand Down Expand Up @@ -834,6 +880,7 @@ protected boolean networkFlush(ByteBuffer output) throws IOException

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -916,6 +963,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -998,6 +1046,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());

byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = client.getByteBufferPool().asRetainableByteBufferPool();
}

public HttpDestination getHttpDestination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Result generateResponseContent(int request, ByteBuffer content, boolean l
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = acquire(8);
ByteBuffer endRequestBuffer = acquire(16);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfigur
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.networkByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
RetainableByteBufferPool retainableByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.buffers = byteBufferPool.asRetainableByteBufferPool();
this.parser = parser;
parser.init(MessageListener::new);
}
Expand Down
158 changes: 148 additions & 10 deletions jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
package org.eclipse.jetty.io;

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;

import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
Expand All @@ -29,37 +34,79 @@
@ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool
{
public static final int DEFAULT_FACTOR = 4096;
public static final int DEFAULT_MAX_CAPACITY_BY_FACTOR = 16;
private final int _factor;
private final int _maxQueueLength;
private final int _maxCapacity;
private final int _maxBucketSize;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();

private final RetainableByteBufferPool _retainableByteBufferPool;

/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
protected AbstractByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
_factor = factor <= 0 ? DEFAULT_FACTOR : factor;
_maxCapacity = maxCapacity > 0 ? maxCapacity : DEFAULT_MAX_CAPACITY_BY_FACTOR * _factor;
_maxBucketSize = maxBucketSize;
_maxHeapMemory = memorySize(maxHeapMemory);
_maxDirectMemory = memorySize(maxDirectMemory);
_retainableByteBufferPool = (retainedHeapMemory == -2 && retainedDirectMemory == -2)
? RetainableByteBufferPool.from(this)
: newRetainableByteBufferPool(factor, maxCapacity, maxBucketSize, retainedSize(retainedHeapMemory), retainedSize(retainedDirectMemory));
}

static long retainedSize(long size)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
if (size == -2)
return 0;
return memorySize(size);
}

static long memorySize(long size)
{
if (size < 0)
return -1;
if (size == 0)
return Runtime.getRuntime().maxMemory() / 4;
return size;
}

protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return RetainableByteBufferPool.from(this);
lorban marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
return _retainableByteBufferPool;
}

protected int getCapacityFactor()
{
return _factor;
}

protected int getMaxQueueLength()
protected int getMaxCapacity()
{
return _maxQueueLength;
return _maxCapacity;
}

protected int getMaxBucketSize()
{
return _maxBucketSize;
}

@Deprecated
Expand Down Expand Up @@ -123,6 +170,97 @@ public long getMemory(boolean direct)
return memory.get();
}

protected static class Bucket
{
private final Queue<ByteBuffer> _queue = new ConcurrentLinkedQueue<>();
private final int _capacity;
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;

@Deprecated
public Bucket(int capacity, int maxSize)
{
this(capacity, maxSize, i -> {});
}

public Bucket(int capacity, int maxSize, IntConsumer memoryFunction)
{
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}

public ByteBuffer acquire()
{
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}

return buffer;
}

public void release(ByteBuffer buffer)
{
resetUpdateTime();
BufferUtil.reset(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}

void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}

public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (_size != null)
--size;
}
}

boolean isEmpty()
{
return _queue.isEmpty();
}

int size()
{
return _queue.size();
}

long getLastUpdate()
{
return _lastUpdate.getOpaque();
}

@Override
public String toString()
{
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}

IntConsumer updateMemory(boolean direct)
{
return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet;
Expand Down
Loading