From 259348d70dcd5f0efb7934c9d4210b5c3249d277 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 17 Nov 2021 16:32:01 +0100 Subject: [PATCH] #7107: fix too-eager closing of multiplexed connections marked as closed but still in use --- .../jetty/client/AbstractConnectionPool.java | 41 ++++-- .../jetty/client/ConnectionPoolTest.java | 45 ++++--- .../http/MultiplexedConnectionPoolTest.java | 122 +++++++++++++++++- 3 files changed, 173 insertions(+), 35 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 018dfb08c085..dc6214c65fd6 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -305,6 +305,8 @@ protected void tryCreate(boolean create) if (entry == null) { pending.decrementAndGet(); + if (LOG.isDebugEnabled()) + LOG.debug("Not creating connection as pool is full, pending: {}", pending); return; } @@ -334,7 +336,7 @@ protected Connection activate() EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment(); if (holder.isExpired(maxDurationNanos)) { - boolean canClose = remove(connection, true); + boolean canClose = remove(connection); if (canClose) IO.close(connection); if (LOG.isDebugEnabled()) @@ -381,22 +383,27 @@ protected boolean deactivate(Connection connection) EntryHolder holder = (EntryHolder)attachable.getAttachment(); if (holder == null) return true; - boolean reusable = pool.release(holder.entry); - if (LOG.isDebugEnabled()) - LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool); - if (reusable) - return true; - remove(connection); - return false; + + long maxDurationNanos = this.maxDurationNanos; + if (maxDurationNanos > 0L && holder.isExpired(maxDurationNanos)) + { + // Remove instead of release if the connection expired. + return !remove(connection); + } + else + { + // Release if the connection has not expired, then remove if not reusable. + boolean reusable = pool.release(holder.entry); + if (LOG.isDebugEnabled()) + LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool); + if (reusable) + return true; + return !remove(connection); + } } @Override public boolean remove(Connection connection) - { - return remove(connection, false); - } - - protected boolean remove(Connection connection, boolean force) { if (!(connection instanceof Attachable)) throw new IllegalArgumentException("Invalid connection object: " + connection); @@ -409,7 +416,7 @@ protected boolean remove(Connection connection, boolean force) attachable.setAttachment(null); if (LOG.isDebugEnabled()) LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool); - if (removed || force) + if (removed) { released(connection); removed(connection); @@ -417,6 +424,12 @@ protected boolean remove(Connection connection, boolean force) return removed; } + @Deprecated + protected boolean remove(Connection connection, boolean force) + { + return remove(connection); + } + protected void onCreated(Connection connection) { } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index caeff54a636f..1f5689c68c7c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -62,31 +62,36 @@ public class ConnectionPoolTest { - private Server server; - private ServerConnector connector; - private HttpClient client; + private static final ConnectionPoolFactory DUPLEX = new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)); + private static final ConnectionPoolFactory MULTIPLEX = new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)); + private static final ConnectionPoolFactory RANDOM = new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)); + private static final ConnectionPoolFactory DUPLEX_MAX_DURATION = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination); + pool.setMaxDuration(10); + return pool; + }); + private static final ConnectionPoolFactory ROUND_ROBIN = new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)); public static Stream pools() { - return Stream.concat(poolsNoRoundRobin(), - Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)))); + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION, ROUND_ROBIN); + } + + public static Stream poolsNoMaxDuration() + { + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, ROUND_ROBIN); } public static Stream poolsNoRoundRobin() { - return Stream.of( - new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), - new ConnectionPoolFactory("duplex-maxDuration", destination -> - { - DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination); - pool.setMaxDuration(10); - return pool; - }), - new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)), - new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) - ); + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION); } + private Server server; + private ServerConnector connector; + private HttpClient client; + private void start(ConnectionPool.Factory factory, Handler handler) throws Exception { startServer(handler); @@ -367,7 +372,8 @@ public void resolve(String host, int port, Promise> prom } @ParameterizedTest - @MethodSource("pools") + // Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion. + @MethodSource("poolsNoMaxDuration") public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception { int count = 50; @@ -375,14 +381,15 @@ public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(Conn } @ParameterizedTest - @MethodSource("pools") + // Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion. + @MethodSource("poolsNoMaxDuration") public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception { int count = 50; testConcurrentRequestsAllBlockedOnServer(factory, count, count); } - private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception + private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception { CyclicBarrier barrier = new CyclicBarrier(count); diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java index 307260e8eb08..3c903d4e6c67 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -44,6 +44,8 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -52,6 +54,7 @@ // Sibling of ConnectionPoolTest, but using H2 to multiplex connections. public class MultiplexedConnectionPoolTest { + private final Logger LOG = Log.getLogger(MultiplexedConnectionPoolTest.class); private static final int MAX_MULTIPLEX = 2; private Server server; @@ -90,6 +93,121 @@ public void disposeClient() throws Exception } } + @Test + public void testMaxDurationConnectionsWithMultiplexedPoolLifecycle() throws Exception + { + final int maxDuration = 200; + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + AtomicReference> poolRef = new AtomicReference<>(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("MaxDurationConnectionsWithMultiplexedPoolLifecycle", destination -> + { + int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); + MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + poolRef.set(pool.getBean(Pool.class)); + pool.setMaxDuration(maxDuration); + return pool; + }); + + CountDownLatch[] reqExecutingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + CountDownLatch[] reqExecutedLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + CountDownLatch[] reqFinishingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + startServer(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + int req = Integer.parseInt(target.substring(1)); + try + { + LOG.debug("req {} is executing", req); + reqExecutingLatches[req].countDown(); + Thread.sleep(250); + reqExecutedLatches[req].countDown(); + LOG.debug("req {} executed", req); + + assertTrue(reqFinishingLatches[req].await(5, TimeUnit.SECONDS)); + + response.getWriter().println("req " + req + " executed"); + response.getWriter().flush(); + LOG.debug("req {} successful", req); + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }); + + HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client()); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport); + client.start(); + + CountDownLatch[] reqClientSuccessLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + + sendRequest(reqClientSuccessLatches, 0); + // wait until handler is executing + assertTrue(reqExecutingLatches[0].await(5, TimeUnit.SECONDS)); + LOG.debug("req 0 executing"); + + sendRequest(reqClientSuccessLatches, 1); + // wait until handler executed sleep + assertTrue(reqExecutedLatches[1].await(5, TimeUnit.SECONDS)); + LOG.debug("req 1 executed"); + + // Now the pool contains one connection that is expired but in use by 2 threads. + + sendRequest(reqClientSuccessLatches, 2); + LOG.debug("req2 sent"); + assertTrue(reqExecutingLatches[2].await(5, TimeUnit.SECONDS)); + LOG.debug("req2 executing"); + + // The 3rd request has tried the expired request and marked it as closed as it has expired, then used a 2nd one. + + // release and wait for req2 to be done before releasing req1 + reqFinishingLatches[2].countDown(); + assertTrue(reqClientSuccessLatches[2].await(5, TimeUnit.SECONDS)); + reqFinishingLatches[1].countDown(); + + // release req0 once req1 is done; req 1 should not have closed the response as req 0 is still running + assertTrue(reqClientSuccessLatches[1].await(5, TimeUnit.SECONDS)); + reqFinishingLatches[0].countDown(); + assertTrue(reqClientSuccessLatches[0].await(5, TimeUnit.SECONDS)); + + // Check that the pool created 2 and removed 2 connections; + // 2 were removed b/c waiting for req 2 means the 2nd connection + // expired and has to be removed and closed upon being returned to the pool. + assertThat(poolCreateCounter.get(), Matchers.is(2)); + assertThat(poolRemoveCounter.get(), Matchers.is(2)); + assertThat(poolRef.get().size(), Matchers.is(0)); + } + + private void sendRequest(CountDownLatch[] reqClientDoneLatches, int i) + { + client.newRequest("localhost", connector.getLocalPort()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + assertThat("req " + i + " failed", result.getResponse().getStatus(), Matchers.is(200)); + reqClientDoneLatches[i].countDown(); + }); + } + @Test public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception { @@ -97,7 +215,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception AtomicInteger poolCreateCounter = new AtomicInteger(); AtomicInteger poolRemoveCounter = new AtomicInteger(); AtomicReference> poolRef = new AtomicReference<>(); - ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> + ConnectionPoolFactory factory = new ConnectionPoolFactory("maxDurationConnectionsWithMultiplexedPool", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) @@ -284,7 +402,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertThat(poolRef.get().getIdleCount(), is(0)); assertThat(poolRef.get().size(), is(0)); - assertThat(poolRemoveCounter.get(), is(3)); + assertThat(poolRemoveCounter.get(), is(2)); } private static class ConnectionPoolFactory