Skip to content

Commit

Permalink
Merge pull request #5187 from eclipse/jetty-9.4.x-5147-h2_round_robin…
Browse files Browse the repository at this point in the history
…_max_usage

Issue #5147 - HTTP2 RoundRobinConnectionPool with maxUsage
  • Loading branch information
sbordet authored Aug 25, 2020
2 parents 4dfcb80 + 0af5f67 commit 0dd8274
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,14 @@ public void release(Connection connection)
{
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
{
send(false);
}
else
{
connection.close();
send(true);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@

package org.eclipse.jetty.client;

import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;

@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);

private final AtomicInteger offset = new AtomicInteger();
private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
Expand All @@ -47,26 +47,32 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
}

@Override
protected Connection activate()
protected Connection acquire(boolean create)
{
int offset = this.offset.get();
Connection connection = activate(offset);
if (connection != null)
this.offset.getAndIncrement();
return connection;
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
}

private Connection activate(int offset)
@Override
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
if (entry != null)
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
Connection connection = entry.getPooled();
acquired(connection);
return connection;
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
return null;
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -99,12 +98,9 @@ public void send(HttpExchange exchange)
@Override
public void release()
{
setStream(null);
connection.release(this);
}

void onStreamClosed(IStream stream)
{
connection.onStreamClosed(stream, this);
getHttpDestination().release(getHttpConnection());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -119,16 +118,6 @@ else if (isRecycleHttpChannels())
}
}

void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
// Only non-push channels are released.
if (stream.isLocal())
getHttpDestination().release(this);
}

@Override
public boolean onIdleTimeout(long idleTimeout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
Expand Down Expand Up @@ -201,12 +200,6 @@ public void onFailure(Stream stream, int error, String reason, Throwable failure
callback.succeeded();
}

@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed((IStream)stream);
}

private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(exchange, frame, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand Down Expand Up @@ -180,4 +182,56 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate)));
}
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testMultiplexWithMaxUsage(Transport transport) throws Exception
{
init(transport);

int multiplex = 1;
if (scenario.transport.isHttp2Based())
multiplex = 2;
int maxMultiplex = multiplex;

int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;

List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
remotePorts.add(request.getRemotePort());
}
});
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex);
pool.setMaxUsageCount(maxUsage);
return pool;
});

CountDownLatch clientLatch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
scenario.client.newRequest(scenario.newURI())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
if (result.getResponse().getStatus() == HttpStatus.OK_200)
clientLatch.countDown();
});
}
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());

Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
}
}

0 comments on commit 0dd8274

Please sign in to comment.