Skip to content

Commit

Permalink
Fixes #6603 - HTTP/2 max local stream count exceeded (#6639) (#6682)
Browse files Browse the repository at this point in the history
* Fixes #6603 - HTTP/2 max local stream count exceeded (#6639)

Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis.
Updated Pool javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Greg Wilkins <gregw@webtide.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
(cherry picked from commit 525fcb3)
  • Loading branch information
sbordet authored Sep 1, 2021
1 parent f129770 commit e2690cc
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,20 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen

protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
this.pool = pool;
pool.setMaxMultiplex(1); // Force the use of multiplexing.
addBean(pool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public interface ConnectionPool extends Closeable
{
/**
* Optionally pre-create up to <code>connectionCount</code>
* Optionally pre-create up to {@code connectionCount}
* connections so they are immediately ready for use.
* @param connectionCount the number of connections to pre-start.
*/
Expand Down Expand Up @@ -106,7 +106,7 @@ interface Factory
}

/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
Expand All @@ -117,7 +117,11 @@ interface Multiplexable

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed
*/
void setMaxMultiplex(int maxMultiplex);
@Deprecated
default void setMaxMultiplex(int maxMultiplex)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal

public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

@Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
Expand All @@ -29,9 +29,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,

public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
{
@Override
protected int getMaxMultiplex(Connection connection)
{
int multiplex = (connection instanceof Multiplexable)
? ((Multiplexable)connection).getMaxMultiplex()
: super.getMaxMultiplex(connection);
return multiplex > 0 ? multiplex : 1;
}
}, requester);
setMaxMultiplex(maxMultiplex);
}

@Deprecated
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,10 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
failFn.accept(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -799,7 +802,7 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -834,6 +837,9 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -847,7 +853,7 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -1019,7 +1025,7 @@ void scheduleTimeout(HTTP2Stream stream)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ private Promise<Connection> httpConnectionPromise()
@Override
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{
HttpDestination destination = destination();
if (destination instanceof HttpDestination.Multiplexed)
((HttpDestination.Multiplexed)destination).setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
}
// The first SETTINGS frame is the server preface reply.
if (!connection.isMarked())
onServerPreface(session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -46,7 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);

Expand Down Expand Up @@ -78,6 +79,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
public int getMaxMultiplex()
{
return ((HTTP2Session)session).getMaxLocalStreams();
}

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -71,6 +72,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -538,6 +540,111 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

assertEquals(1, client.getDestinations().size());
HttpDestination destination = (HttpDestination)client.getDestinations().get(0);
AbstractConnectionPool pool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
Expand All @@ -111,6 +109,7 @@ protected void removed(Connection connection)
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
Expand Down Expand Up @@ -156,9 +155,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
Expand All @@ -172,6 +169,7 @@ protected void removed(Connection connection)
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
Expand Down
Loading

0 comments on commit e2690cc

Please sign in to comment.