Skip to content

Commit

Permalink
#7107: fix too-eager closing of multiplexed connections marked as clo…
Browse files Browse the repository at this point in the history
…sed but still in use
  • Loading branch information
lorban committed Nov 22, 2021
1 parent 892e4b3 commit 259348d
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ protected void tryCreate(boolean create)
if (entry == null)
{
pending.decrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("Not creating connection as pool is full, pending: {}", pending);
return;
}

Expand Down Expand Up @@ -334,7 +336,7 @@ protected Connection activate()
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
if (holder.isExpired(maxDurationNanos))
{
boolean canClose = remove(connection, true);
boolean canClose = remove(connection);
if (canClose)
IO.close(connection);
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -381,22 +383,27 @@ protected boolean deactivate(Connection connection)
EntryHolder holder = (EntryHolder)attachable.getAttachment();
if (holder == null)
return true;
boolean reusable = pool.release(holder.entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
if (reusable)
return true;
remove(connection);
return false;

long maxDurationNanos = this.maxDurationNanos;
if (maxDurationNanos > 0L && holder.isExpired(maxDurationNanos))
{
// Remove instead of release if the connection expired.
return !remove(connection);
}
else
{
// Release if the connection has not expired, then remove if not reusable.
boolean reusable = pool.release(holder.entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
if (reusable)
return true;
return !remove(connection);
}
}

@Override
public boolean remove(Connection connection)
{
return remove(connection, false);
}

protected boolean remove(Connection connection, boolean force)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Expand All @@ -409,14 +416,20 @@ protected boolean remove(Connection connection, boolean force)
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool);
if (removed || force)
if (removed)
{
released(connection);
removed(connection);
}
return removed;
}

@Deprecated
protected boolean remove(Connection connection, boolean force)
{
return remove(connection);
}

protected void onCreated(Connection connection)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,36 @@

public class ConnectionPoolTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;
private static final ConnectionPoolFactory DUPLEX = new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination));
private static final ConnectionPoolFactory MULTIPLEX = new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1));
private static final ConnectionPoolFactory RANDOM = new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1));
private static final ConnectionPoolFactory DUPLEX_MAX_DURATION = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
pool.setMaxDuration(10);
return pool;
});
private static final ConnectionPoolFactory ROUND_ROBIN = new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination));

public static Stream<ConnectionPoolFactory> pools()
{
return Stream.concat(poolsNoRoundRobin(),
Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination))));
return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION, ROUND_ROBIN);
}

public static Stream<ConnectionPoolFactory> poolsNoMaxDuration()
{
return Stream.of(DUPLEX, MULTIPLEX, RANDOM, ROUND_ROBIN);
}

public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
pool.setMaxDuration(10);
return pool;
}),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION);
}

private Server server;
private ServerConnector connector;
private HttpClient client;

private void start(ConnectionPool.Factory factory, Handler handler) throws Exception
{
startServer(handler);
Expand Down Expand Up @@ -367,22 +372,24 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
}

@ParameterizedTest
@MethodSource("pools")
// Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion.
@MethodSource("poolsNoMaxDuration")
public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count);
}

@ParameterizedTest
@MethodSource("pools")
// Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion.
@MethodSource("poolsNoMaxDuration")
public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, count);
}

private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception
private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(count);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -52,6 +54,7 @@
// Sibling of ConnectionPoolTest, but using H2 to multiplex connections.
public class MultiplexedConnectionPoolTest
{
private final Logger LOG = Log.getLogger(MultiplexedConnectionPoolTest.class);
private static final int MAX_MULTIPLEX = 2;

private Server server;
Expand Down Expand Up @@ -90,14 +93,129 @@ public void disposeClient() throws Exception
}
}

@Test
public void testMaxDurationConnectionsWithMultiplexedPoolLifecycle() throws Exception
{
final int maxDuration = 200;
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
ConnectionPoolFactory factory = new ConnectionPoolFactory("MaxDurationConnectionsWithMultiplexedPoolLifecycle", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10)
{
@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(pool.getBean(Pool.class));
pool.setMaxDuration(maxDuration);
return pool;
});

CountDownLatch[] reqExecutingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)};
CountDownLatch[] reqExecutedLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)};
CountDownLatch[] reqFinishingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)};
startServer(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
{
int req = Integer.parseInt(target.substring(1));
try
{
LOG.debug("req {} is executing", req);
reqExecutingLatches[req].countDown();
Thread.sleep(250);
reqExecutedLatches[req].countDown();
LOG.debug("req {} executed", req);

assertTrue(reqFinishingLatches[req].await(5, TimeUnit.SECONDS));

response.getWriter().println("req " + req + " executed");
response.getWriter().flush();
LOG.debug("req {} successful", req);
}
catch (Exception e)
{
throw new ServletException(e);
}
}
});

HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
client.start();

CountDownLatch[] reqClientSuccessLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)};

sendRequest(reqClientSuccessLatches, 0);
// wait until handler is executing
assertTrue(reqExecutingLatches[0].await(5, TimeUnit.SECONDS));
LOG.debug("req 0 executing");

sendRequest(reqClientSuccessLatches, 1);
// wait until handler executed sleep
assertTrue(reqExecutedLatches[1].await(5, TimeUnit.SECONDS));
LOG.debug("req 1 executed");

// Now the pool contains one connection that is expired but in use by 2 threads.

sendRequest(reqClientSuccessLatches, 2);
LOG.debug("req2 sent");
assertTrue(reqExecutingLatches[2].await(5, TimeUnit.SECONDS));
LOG.debug("req2 executing");

// The 3rd request has tried the expired request and marked it as closed as it has expired, then used a 2nd one.

// release and wait for req2 to be done before releasing req1
reqFinishingLatches[2].countDown();
assertTrue(reqClientSuccessLatches[2].await(5, TimeUnit.SECONDS));
reqFinishingLatches[1].countDown();

// release req0 once req1 is done; req 1 should not have closed the response as req 0 is still running
assertTrue(reqClientSuccessLatches[1].await(5, TimeUnit.SECONDS));
reqFinishingLatches[0].countDown();
assertTrue(reqClientSuccessLatches[0].await(5, TimeUnit.SECONDS));

// Check that the pool created 2 and removed 2 connections;
// 2 were removed b/c waiting for req 2 means the 2nd connection
// expired and has to be removed and closed upon being returned to the pool.
assertThat(poolCreateCounter.get(), Matchers.is(2));
assertThat(poolRemoveCounter.get(), Matchers.is(2));
assertThat(poolRef.get().size(), Matchers.is(0));
}

private void sendRequest(CountDownLatch[] reqClientDoneLatches, int i)
{
client.newRequest("localhost", connector.getLocalPort())
.path("/" + i)
.timeout(5, TimeUnit.SECONDS)
.send(result ->
{
assertThat("req " + i + " failed", result.getResponse().getStatus(), Matchers.is(200));
reqClientDoneLatches[i].countDown();
});
}

@Test
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
{
final int maxDuration = 30;
AtomicInteger poolCreateCounter = new AtomicInteger();
AtomicInteger poolRemoveCounter = new AtomicInteger();
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
ConnectionPoolFactory factory = new ConnectionPoolFactory("maxDurationConnectionsWithMultiplexedPool", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
Expand Down Expand Up @@ -284,7 +402,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

assertThat(poolRef.get().getIdleCount(), is(0));
assertThat(poolRef.get().size(), is(0));
assertThat(poolRemoveCounter.get(), is(3));
assertThat(poolRemoveCounter.get(), is(2));
}

private static class ConnectionPoolFactory
Expand Down

0 comments on commit 259348d

Please sign in to comment.