Skip to content

Commit 57e1b41

Browse files
committed
Handle shrinking stream window sizes.
1 parent e960ab7 commit 57e1b41

File tree

5 files changed

+90
-62
lines changed

5 files changed

+90
-62
lines changed

proxy/http2/Http2ConnectionState.cc

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,14 +1360,36 @@ Http2ConnectionState::create_stream(Http2StreamId new_id, Http2Error &error)
13601360
}
13611361
}
13621362

1363-
uint32_t stream_window = 0;
1363+
uint32_t initial_stream_window = 0;
1364+
uint32_t initial_stream_window_target = 0;
13641365
if (client_streamid && this->_has_dynamic_stream_window()) {
1365-
stream_window = this->_get_configured_receive_session_window_size_in() / (client_streams_in_count.load() + 1);
1366+
// If we use a flow control mechanism with dynamic stream sizes, then we
1367+
// may send SETTINGS frames which shrink the stream window. In reality, if we
1368+
// shrink the window sizes like this, we cannot be sure that the peer will
1369+
// receive these before sending data. The peer may start up many streams in
1370+
// quick succession, with each new stream resulting in us sending new
1371+
// SETTINGS frames to it, thus shrinking the stream window. But the peer
1372+
// may not receive any of these before sending data on any of these
1373+
// streams. Therefore we have to initialize all our window sizes at the
1374+
// maximal value so we don't incorrectly reply with a FLOW_CONTROL_ERROR
1375+
// RST_STREAM frame in these situations.
1376+
//
1377+
// Effectively what we do here is:
1378+
// 1. initial_stream_window: Initialize our HttpStreams with a generous
1379+
// stream window size. This is what is used to detect flow control
1380+
// errors.
1381+
// 2. initial_stream_window_target: Communicate our intended, smaller
1382+
// initial stream window size to the peer via a SETTINGS frames.
1383+
//
1384+
// This situation is described in RFC 9113 section 6.9.3.
1385+
initial_stream_window = this->_get_configured_receive_session_window_size_in();
1386+
initial_stream_window_target = initial_stream_window / (client_streams_in_count.load() + 1);
13661387
} else {
1367-
stream_window = this->local_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
1388+
initial_stream_window = this->local_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
1389+
initial_stream_window_target = initial_stream_window;
13681390
}
13691391
Http2Stream *new_stream = THREAD_ALLOC_INIT(http2StreamAllocator, this_ethread(), session->get_proxy_session(), new_id,
1370-
peer_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE), stream_window);
1392+
peer_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE), initial_stream_window);
13711393

13721394
ink_assert(nullptr != new_stream);
13731395
ink_assert(!stream_list.in(new_stream));
@@ -1382,11 +1404,8 @@ Http2ConnectionState::create_stream(Http2StreamId new_id, Http2Error &error)
13821404
++client_streams_in_count;
13831405
if (this->_has_dynamic_stream_window()) {
13841406
Http2ConnectionSettings new_settings = local_settings;
1385-
new_settings.set(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE, stream_window);
1407+
new_settings.set(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE, initial_stream_window_target);
13861408
send_settings_frame(new_settings);
1387-
1388-
// Update all the streams for the new window.
1389-
this->update_initial_server_rwnd(stream_window);
13901409
}
13911410
} else {
13921411
latest_streamid_out = new_id;
@@ -1477,22 +1496,42 @@ Http2ConnectionState::restart_receiving(Http2Stream *stream)
14771496
return;
14781497
}
14791498

1499+
uint32_t initial_stream_window = this->local_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
1500+
if (this->_has_dynamic_stream_window()) {
1501+
// If our stream window size is dynamic, then we have to allow for this situation:
1502+
//
1503+
// 1. ATS initializes the stream window size to the configured value via a
1504+
// SETTINGS frame, say 10 bytes.
1505+
//
1506+
// 2. The peer sends a data frame of that size, again 10 bytes.
1507+
//
1508+
// 3. At the same time that the peer sends the data frame, we shrink the
1509+
// stream window size via a SETTINGS frame to a smaller value, say 5 bytes.
1510+
//
1511+
// 4. The peer receives the SETTINGS frame and calculates its send window
1512+
// to be -5 bytes.
1513+
//
1514+
// 5. ATS cannot simply send a WINDOW_UPDATE frame of 5 bytes to bring the
1515+
// window to 5 bytes, the current initial window size for the stream.
1516+
// Instead, we have to free the peer up to send us data per the maximum value
1517+
// we initialized our streams with.
1518+
initial_stream_window = this->_get_configured_receive_session_window_size_in();
1519+
}
14801520
// If read_vio is buffering data, do not fully update window
1481-
uint32_t const stream_window = this->local_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
1482-
int64_t data_size = stream->read_vio_read_avail();
1483-
if (data_size >= stream_window) {
1521+
int64_t data_size = stream->read_vio_read_avail();
1522+
if (data_size >= initial_stream_window) {
14841523
return;
14851524
}
14861525

14871526
// Update the window size for the stream
1488-
Http2WindowSize diff_size = 0;
1489-
if (stream->server_rwnd() < 0 && data_size == 0) {
1490-
// Receive windows can be negative if we sent a SETTINGS frame that
1491-
// decreased the stream window size mid-stream.
1492-
diff_size = stream_window - stream->server_rwnd();
1493-
} else {
1494-
diff_size = stream_window - std::max(static_cast<int64_t>(stream->server_rwnd()), data_size);
1495-
}
1527+
Http2WindowSize const diff_size = initial_stream_window - std::max(static_cast<int64_t>(stream->server_rwnd()), data_size);
1528+
1529+
// If our flow control policy has dynamic stream window sizes, then it could
1530+
// be that we shrank the initial stream window size. In this case, our
1531+
// calculated window size might be in a state where we are shrinking it to
1532+
// the new value. If so, this calculation would come out negative and we will
1533+
// not want to send a WINDOW_UPDATE frame but will rather let the peer
1534+
// continue to send us data until the window shrinks to the appropriate size.
14961535
if (diff_size > 0) {
14971536
stream->increment_server_rwnd(diff_size);
14981537
this->send_window_update_frame(stream->get_id(), diff_size);
@@ -1637,27 +1676,6 @@ Http2ConnectionState::update_initial_client_rwnd(Http2WindowSize new_size)
16371676
}
16381677
}
16391678

1640-
void
1641-
Http2ConnectionState::update_initial_server_rwnd(Http2WindowSize new_size)
1642-
{
1643-
// Update stream level window sizes
1644-
for (Http2Stream *s = stream_list.head; s; s = static_cast<Http2Stream *>(s->link.next)) {
1645-
SCOPED_MUTEX_LOCK(lock, s->mutex, this_ethread());
1646-
// Set the new window size, but take into account the already adjusted
1647-
// window based on previously sent bytes.
1648-
//
1649-
// For example:
1650-
// 1. ATS initializes the stream window to 10K bytes.
1651-
// 2. ATS receives 3K bytes from the client. The stream window is now 7K bytes.
1652-
// 3. ATS sends a SETTINGS frame to the client to update the initial window size to 20K bytes.
1653-
// 4. The stream window should be updated to 17K bytes: 20K - (10K - 7K).
1654-
//
1655-
// Note that if ATS reduces the stream window, this may result in negative
1656-
// receive window values.
1657-
s->set_server_rwnd(new_size - (local_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE) - s->server_rwnd()));
1658-
}
1659-
}
1660-
16611679
void
16621680
Http2ConnectionState::schedule_stream(Http2Stream *stream)
16631681
{

proxy/http2/Http2ConnectionState.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,6 @@ class Http2ConnectionState : public Continuation
115115
/** Update all streams for the peer's newly dictated stream window size. */
116116
void update_initial_client_rwnd(Http2WindowSize new_size);
117117

118-
/** Update all streams for our newly dictated stream window size. */
119-
void update_initial_server_rwnd(Http2WindowSize new_size);
120-
121118
Http2StreamId get_latest_stream_id_in() const;
122119
Http2StreamId get_latest_stream_id_out() const;
123120
int get_stream_requests() const;

proxy/http2/Http2Stream.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ class Http2Stream : public ProxyTransaction
126126
Http2StreamState get_state() const;
127127
bool change_state(uint8_t type, uint8_t flags);
128128
void set_client_rwnd(Http2WindowSize new_size);
129-
void set_server_rwnd(Http2WindowSize new_size);
130129
bool has_trailing_header() const;
131130
void set_receive_headers(HTTPHdr &h2_headers);
132131
MIOBuffer *read_vio_writer() const;
@@ -265,12 +264,6 @@ Http2Stream::set_client_rwnd(Http2WindowSize new_size)
265264
this->_client_rwnd = new_size;
266265
}
267266

268-
inline void
269-
Http2Stream::set_server_rwnd(Http2WindowSize new_size)
270-
{
271-
this->_server_rwnd = new_size;
272-
}
273-
274267
inline bool
275268
Http2Stream::has_trailing_header() const
276269
{

tests/gold_tests/autest-site/trafficserver.test.ext

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,14 @@ def MakeATSProcess(obj, name, command='traffic_server', select_ports=True,
338338
get_port(p, "manager_port")
339339
get_port(p, "admin_port")
340340

341+
if enable_tls:
342+
fname = "tls_session_keys.txt"
343+
tmpname = os.path.join(log_dir, fname)
344+
p.Disk.File(tmpname, id='tls_session_keys')
345+
p.Disk.records_config.update({
346+
'proxy.config.ssl.keylog_file': tmpname,
347+
})
348+
341349
if enable_cache:
342350
# In records.config, the cache is enabled by default so there's nothing
343351
# we have to do here to functionally enable it. However, the tests that

tests/gold_tests/h2/http2_flow_control.test.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ def _configure_client(self, tr):
172172
# need to set up client expectations.
173173
return
174174

175+
# ATS currently always sends a MAX_CONCURRENT_STREAMS setting.
175176
tr.Processes.Default.Streams.stdout += Testers.ContainsExpression(
176177
f'MAX_CONCURRENT_STREAMS:{self._expected_max_concurrent_streams_in}',
177178
"Client should receive a MAX_CONCURRENT_STREAMS setting.")
@@ -181,21 +182,22 @@ def _configure_client(self, tr):
181182
f'INITIAL_WINDOW_SIZE:{self._expected_initial_stream_window_size}',
182183
"Client should receive an INITIAL_WINDOW_SIZE setting.")
183184

184-
if self._expected_flow_control_policy == 0:
185-
update_window_size = (
186-
self._expected_initial_stream_window_size -
187-
self._default_initial_window_size)
188-
if update_window_size > 0:
189-
tr.Processes.Default.Streams.stdout += Testers.ContainsExpression(
190-
f'WINDOW_UPDATE.*id 0: {update_window_size}',
191-
"Client should receive a session WINDOW_UPDATE.")
185+
if self._expected_flow_control_policy == 0:
186+
update_window_size = (
187+
self._expected_initial_stream_window_size -
188+
self._default_initial_window_size)
189+
if update_window_size > 0:
190+
tr.Processes.Default.Streams.stdout += Testers.ContainsExpression(
191+
f'WINDOW_UPDATE.*id 0: {update_window_size}',
192+
"Client should receive a session WINDOW_UPDATE.")
192193

193194
if self._expected_flow_control_policy in (1, 2):
194195
# Verify the larger window size.
195196

196197
session_window_size = (
197198
self._expected_initial_stream_window_size *
198199
self._expected_max_concurrent_streams_in)
200+
199201
# ATS will send a WINDOW_UPDATE frame to the client to increase
200202
# the session window size to the configured value from the default
201203
# value.
@@ -207,6 +209,16 @@ def _configure_client(self, tr):
207209
if update_window_size > Http2FlowControlTest._default_initial_window_size:
208210
tr.Processes.Default.Streams.stdout += Testers.ContainsExpression(
209211
f'WINDOW_UPDATE.*id 0: {update_window_size}',
212+
"Client should receive an initial session WINDOW_UPDATE.")
213+
else:
214+
# Our test traffic is large enough that eventually we should
215+
# send a session WINDOW_UPDATE frame for the smaller window.
216+
# It's not clear what it will be in advance though. A 100 byte
217+
# session window may not receive a 100 byte WINDOW_UPDATE frame
218+
# if the client is sending DATA frames in 10 byte chunks due to
219+
# a smaller stream window.
220+
tr.Processes.Default.Streams.stdout += Testers.ContainsExpression(
221+
'WINDOW_UPDATE.*id 0: ',
210222
"Client should receive a session WINDOW_UPDATE.")
211223

212224
if self._expected_flow_control_policy == 2:
@@ -278,18 +290,18 @@ def run(self):
278290
flow_control_policy=23)
279291
test.run()
280292
test = Http2FlowControlTest(
281-
description="Static flow control with a small initial_window_size_in",
282-
initial_window_size=500,
293+
description="Flow control policy 0 (default): small initial_window_size_in",
294+
initial_window_size=500, # The default is 65 KB.
283295
flow_control_policy=0)
284296
test.run()
285297
test = Http2FlowControlTest(
286-
description="Static flow control with a large initial_window_size_in",
298+
description="Flow control policy 1: 100 byte session, 10 byte stream windows",
287299
max_concurrent_streams_in=10,
288300
initial_window_size=10,
289301
flow_control_policy=1)
290302
test.run()
291303
test = Http2FlowControlTest(
292-
description="Dynamic flow control with a small initial_window_size_in",
304+
description="Flow control policy 2: 100 byte session, dynamic stream windows",
293305
max_concurrent_streams_in=10,
294306
initial_window_size=10,
295307
flow_control_policy=2)

0 commit comments

Comments
 (0)