Skip to content

Commit

Permalink
HTTP/2 improvements. (#9749)
Browse files Browse the repository at this point in the history
* Implemented a few required error handlings.
* Changed `Parser.init()` to directly take the listener, rather than wrapping it.
  The reason for this change was to be able to reconfigure the Parser upon receiving a SETTINGS frame.
* Initially setting the encoder and decoder max table capacity at the default of 4096, as per spec.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored May 27, 2023
1 parent debb124 commit 420ec7c
Show file tree
Hide file tree
Showing 52 changed files with 1,376 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public class HttpClient extends ContainerLifeCycle
private String defaultRequestContentType = "application/octet-stream";
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxResponseHeadersSize = -1;
private Sweeper destinationSweeper;

/**
Expand Down Expand Up @@ -1181,6 +1182,23 @@ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}

/**
* @return the max size in bytes of the response headers
*/
@ManagedAttribute("The max size in bytes of the response headers")
public int getMaxResponseHeadersSize()
{
return maxResponseHeadersSize;
}

/**
* @param maxResponseHeadersSize the max size in bytes of the response headers
*/
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;
}

/**
* @return the forward proxy configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
{
super(channel);
HttpClient httpClient = channel.getHttpDestination().getHttpClient();
parser = new HttpParser(this, -1, httpClient.getHttpCompliance());
parser = new HttpParser(this, httpClient.getMaxResponseHeadersSize(), httpClient.getHttpCompliance());
HttpClientTransport transport = httpClient.getTransport();
if (transport instanceof HttpClientTransportOverHTTP)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.hpack.HpackContext;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
Expand Down Expand Up @@ -106,11 +107,13 @@ public class HTTP2Client extends ContainerLifeCycle
private List<String> protocols = List.of("h2");
private int initialSessionRecvWindow = 16 * 1024 * 1024;
private int initialStreamRecvWindow = 8 * 1024 * 1024;
private int maxFrameLength = Frame.DEFAULT_MAX_LENGTH;
private int maxFrameSize = Frame.DEFAULT_MAX_LENGTH;
private int maxConcurrentPushedStreams = 32;
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private int maxDynamicTableSize = 4096;
private int maxDecoderTableCapacity = HpackContext.DEFAULT_MAX_TABLE_CAPACITY;
private int maxEncoderTableCapacity = HpackContext.DEFAULT_MAX_TABLE_CAPACITY;
private int maxHeaderBlockFragment = 0;
private int maxResponseHeadersSize = -1;
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout;
private boolean useInputDirectByteBuffers = true;
Expand Down Expand Up @@ -282,15 +285,27 @@ public void setInitialStreamRecvWindow(int initialStreamRecvWindow)
this.initialStreamRecvWindow = initialStreamRecvWindow;
}

@ManagedAttribute("The max frame length in bytes")
@Deprecated
public int getMaxFrameLength()
{
return maxFrameLength;
return getMaxFrameSize();
}

public void setMaxFrameLength(int maxFrameLength)
@Deprecated
public void setMaxFrameLength(int maxFrameSize)
{
this.maxFrameLength = maxFrameLength;
setMaxFrameSize(maxFrameSize);
}

@ManagedAttribute("The max frame size in bytes")
public int getMaxFrameSize()
{
return maxFrameSize;
}

public void setMaxFrameSize(int maxFrameSize)
{
this.maxFrameSize = maxFrameSize;
}

@ManagedAttribute("The max number of concurrent pushed streams")
Expand All @@ -315,15 +330,44 @@ public void setMaxSettingsKeys(int maxSettingsKeys)
this.maxSettingsKeys = maxSettingsKeys;
}

@ManagedAttribute("The HPACK dynamic table maximum size")
@ManagedAttribute("The HPACK encoder dynamic table maximum capacity")
public int getMaxEncoderTableCapacity()
{
return maxEncoderTableCapacity;
}

/**
* <p>Sets the limit for the encoder HPACK dynamic table capacity.</p>
* <p>Setting this value to {@code 0} disables the use of the dynamic table.</p>
*
* @param maxEncoderTableCapacity The HPACK encoder dynamic table maximum capacity
*/
public void setMaxEncoderTableCapacity(int maxEncoderTableCapacity)
{
this.maxEncoderTableCapacity = maxEncoderTableCapacity;
}

@ManagedAttribute("The HPACK decoder dynamic table maximum capacity")
public int getMaxDecoderTableCapacity()
{
return maxDecoderTableCapacity;
}

public void setMaxDecoderTableCapacity(int maxDecoderTableCapacity)
{
this.maxDecoderTableCapacity = maxDecoderTableCapacity;
}

@Deprecated
public int getMaxDynamicTableSize()
{
return maxDynamicTableSize;
return getMaxDecoderTableCapacity();
}

public void setMaxDynamicTableSize(int maxDynamicTableSize)
@Deprecated
public void setMaxDynamicTableSize(int maxTableSize)
{
this.maxDynamicTableSize = maxDynamicTableSize;
setMaxDecoderTableCapacity(maxTableSize);
}

@ManagedAttribute("The max size of header block fragments")
Expand All @@ -337,6 +381,17 @@ public void setMaxHeaderBlockFragment(int maxHeaderBlockFragment)
this.maxHeaderBlockFragment = maxHeaderBlockFragment;
}

@ManagedAttribute("The max size of response headers")
public int getMaxResponseHeadersSize()
{
return maxResponseHeadersSize;
}

public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;
}

@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.hpack.HpackContext;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
Expand Down Expand Up @@ -54,27 +56,30 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
Scheduler scheduler = client.getScheduler();
Session.Listener listener = (Session.Listener)context.get(SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
Promise<Session> sessionPromise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);

Generator generator = new Generator(byteBufferPool, client.getMaxDynamicTableSize(), client.getMaxHeaderBlockFragment());
Generator generator = new Generator(byteBufferPool, client.isUseOutputDirectByteBuffers(), client.getMaxHeaderBlockFragment());
FlowControlStrategy flowControl = client.getFlowControlStrategyFactory().newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);

Parser parser = new Parser(byteBufferPool, client.getMaxResponseHeadersSize());
parser.setMaxFrameSize(client.getMaxFrameSize());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, parser, generator, listener, flowControl);
session.setMaxRemoteStreams(client.getMaxConcurrentPushedStreams());
session.setMaxEncoderTableCapacity(client.getMaxEncoderTableCapacity());
long streamIdleTimeout = client.getStreamIdleTimeout();
if (streamIdleTimeout > 0)
session.setStreamIdleTimeout(streamIdleTimeout);

Parser parser = new Parser(byteBufferPool, session, 4096, 8192);
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
session, client.getInputBufferSize(), sessionPromise, listener);
connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers());
connection.addEventListener(connectionListener);
parser.init(connection);

return customize(connection, context);
}

Expand All @@ -84,11 +89,11 @@ private static class HTTP2ClientConnection extends HTTP2Connection implements Ca
private final Promise<Session> promise;
private final Session.Listener listener;

private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, HTTP2ClientSession session, int bufferSize, Promise<Session> sessionPromise, Session.Listener listener)
{
super(retainableByteBufferPool, executor, endpoint, parser, session, bufferSize);
super(retainableByteBufferPool, executor, endpoint, session, bufferSize);
this.client = client;
this.promise = promise;
this.promise = sessionPromise;
this.listener = listener;
}

Expand All @@ -98,12 +103,52 @@ public void onOpen()
Map<Integer, Integer> settings = listener.onPreface(getSession());
if (settings == null)
settings = new HashMap<>();
settings.computeIfAbsent(SettingsFrame.INITIAL_WINDOW_SIZE, k -> client.getInitialStreamRecvWindow());
settings.computeIfAbsent(SettingsFrame.MAX_CONCURRENT_STREAMS, k -> client.getMaxConcurrentPushedStreams());

Integer maxFrameLength = settings.get(SettingsFrame.MAX_FRAME_SIZE);
if (maxFrameLength != null)
getParser().setMaxFrameLength(maxFrameLength);
// Below we want to populate any settings to send to the server
// that have a different default than what prescribed by the RFC.
// Changing the configuration is done when the SETTINGS is sent.

settings.compute(SettingsFrame.HEADER_TABLE_SIZE, (k, v) ->
{
if (v == null)
{
v = client.getMaxDecoderTableCapacity();
if (v == HpackContext.DEFAULT_MAX_TABLE_CAPACITY)
v = null;
}
return v;
});
settings.computeIfAbsent(SettingsFrame.MAX_CONCURRENT_STREAMS, k -> client.getMaxConcurrentPushedStreams());
settings.compute(SettingsFrame.INITIAL_WINDOW_SIZE, (k, v) ->
{
if (v == null)
{
v = client.getInitialStreamRecvWindow();
if (v == FlowControlStrategy.DEFAULT_WINDOW_SIZE)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_FRAME_SIZE, (k, v) ->
{
if (v == null)
{
v = client.getMaxFrameSize();
if (v == Frame.DEFAULT_MAX_LENGTH)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_HEADER_LIST_SIZE, (k, v) ->
{
if (v == null)
{
v = client.getMaxResponseHeadersSize();
if (v <= 0)
v = null;
}
return v;
});

PrefaceFrame prefaceFrame = new PrefaceFrame();
SettingsFrame settingsFrame = new SettingsFrame(settings, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package org.eclipse.jetty.http2.client;

import java.util.Map;

import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
Expand All @@ -23,7 +25,9 @@
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
Expand All @@ -34,9 +38,9 @@ public class HTTP2ClientSession extends HTTP2Session
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2ClientSession.class);

public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl)
public HTTP2ClientSession(Scheduler scheduler, EndPoint endPoint, Parser parser, Generator generator, Session.Listener listener, FlowControlStrategy flowControl)
{
super(scheduler, endPoint, generator, listener, flowControl, 1);
super(scheduler, endPoint, parser, generator, listener, flowControl, 1);
}

@Override
Expand Down Expand Up @@ -87,12 +91,30 @@ public void onHeaders(HeadersFrame frame)
}
}

@Override
public void onSettings(SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
Integer value = settings.get(SettingsFrame.ENABLE_PUSH);
// SPEC: servers can only send ENABLE_PUSH=0.
if (value != null && value != 0)
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_frame");
else
super.onSettings(frame);
}

@Override
public void onPushPromise(PushPromiseFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Received {}", frame);

if (!isPushEnabled())
{
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_push_promise_frame");
return;
}

int streamId = frame.getStreamId();
int pushStreamId = frame.getPromisedStreamId();
IStream stream = getStream(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void onHeaders(Stream stream, HeadersFrame frame)
x.printStackTrace();
}
}).start());

assertTrue(clientLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on client: %d/%d", clientLatch.getCount(), total));
assertTrue(serverLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing streams on server: %d/%d", serverLatch.getCount(), total));
assertTrue(responseLatch.await(total, TimeUnit.MILLISECONDS), String.format("Missing response on client: %d/%d", clientLatch.getCount(), total));
Expand Down
Loading

0 comments on commit 420ec7c

Please sign in to comment.