Skip to content

Commit 74d4109

Browse files
committed
Send WINDOW_UPDATE for session when stream ends for other streams
1 parent a6bd288 commit 74d4109

File tree

4 files changed

+462
-43
lines changed

4 files changed

+462
-43
lines changed

proxy/http2/Http2ConnectionState.cc

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ Http2ConnectionState::rcv_data_frame(const Http2Frame &frame)
233233
if (stream->read_enabled()) {
234234
if (frame.header().flags & HTTP2_FLAGS_DATA_END_STREAM) {
235235
stream->signal_read_event(VC_EVENT_READ_COMPLETE);
236+
if (this->get_peer_stream_count() > 1 && this->get_local_rwnd() == 0) {
237+
// This final DATA frame for this stream consumed all the bytes for the
238+
// session window. Send a WINDOW_UPDATE frame in order to open up the
239+
// session window for other streams.
240+
restart_receiving(nullptr);
241+
}
236242
} else {
237243
stream->signal_read_event(VC_EVENT_READ_READY);
238244
}
@@ -1717,6 +1723,9 @@ Http2ConnectionState::restart_streams()
17171723
ink_assert(s != next);
17181724
s = next;
17191725
}
1726+
1727+
// The above stopped at end, so we need to call send_response_body() one
1728+
// last time for the stream pointed to by end.
17201729
if (std::min(this->get_peer_rwnd(), s->get_peer_rwnd()) > 0) {
17211730
SCOPED_MUTEX_LOCK(lock, s->mutex, this_ethread());
17221731
s->restart_sending();
@@ -2026,11 +2035,15 @@ Http2ConnectionState::send_a_data_frame(Http2Stream *stream, size_t &payload_len
20262035
if (window_size <= 0) {
20272036
if (session->is_outbound()) {
20282037
ip_port_text_buffer ipb;
2029-
const char *client_ip = ats_ip_ntop(session->get_proxy_session()->get_remote_addr(), ipb, sizeof(ipb));
2030-
Warning("No window server_ip=%s session_wnd=%zd stream_wnd=%zd peer_initial_window=%u", client_ip, get_peer_rwnd(),
2038+
const char *server_ip = ats_ip_ntop(session->get_proxy_session()->get_remote_addr(), ipb, sizeof(ipb));
2039+
// Warn the user to give them visibility that their server-side
2040+
// connection is being limited by their server's flow control. Maybe
2041+
// they can make adjustments.
2042+
Warning("No window server_ip=%s session_wnd=%zd stream_wnd=%zd peer_initial_window=%u", server_ip, get_peer_rwnd(),
20312043
stream->get_peer_rwnd(), this->peer_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE));
20322044
}
2033-
Http2StreamDebug(this->session, stream->get_id(), "No window");
2045+
Http2StreamDebug(this->session, stream->get_id(), "No window session_wnd=%zd stream_wnd=%zd peer_initial_window=%u",
2046+
get_peer_rwnd(), stream->get_peer_rwnd(), this->peer_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE));
20342047
this->session->flush();
20352048
return Http2SendDataFrameResult::NO_WINDOW;
20362049
}
@@ -2074,7 +2087,7 @@ Http2ConnectionState::send_a_data_frame(Http2Stream *stream, size_t &payload_len
20742087
stream->decrement_peer_rwnd(payload_length);
20752088

20762089
// Create frame
2077-
Http2StreamDebug(session, stream->get_id(), "Send a DATA frame - client window con: %5zd stream: %5zd payload: %5zd flags: 0x%x",
2090+
Http2StreamDebug(session, stream->get_id(), "Send a DATA frame - peer window con: %5zd stream: %5zd payload: %5zd flags: 0x%x",
20782091
_peer_rwnd, stream->get_peer_rwnd(), payload_length, flags);
20792092

20802093
Http2DataFrame data(stream->get_id(), flags, resp_reader, payload_length);

tests/gold_tests/h2/http2_flow_control.replay.yaml

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sessions:
6464
- [ X-Response, { value: 'zero-response', as: equal } ]
6565

6666
- client-request:
67-
delay: 500ms
67+
await: zero-request
6868

6969
headers:
7070
fields:
@@ -235,3 +235,68 @@ sessions:
235235
fields:
236236
- [ X-Response, {value: 'fifth-response', as: equal } ]
237237

238+
- client-request:
239+
# Populate the cache with a large response.
240+
241+
headers:
242+
fields:
243+
- [ :method, GET ]
244+
- [ :scheme, https ]
245+
- [ :authority, www.example.com ]
246+
- [ :path, /sixth-request ]
247+
- [ uuid, sixth-request ]
248+
- [ X-Request, sixth-request ]
249+
250+
proxy-request:
251+
headers:
252+
fields:
253+
- [ X-Request, {value: 'sixth-request', as: equal } ]
254+
255+
server-response:
256+
status: 200
257+
reason: OK
258+
headers:
259+
fields:
260+
- [ X-Response, sixth-response ]
261+
- [ Cache-Control, max-age=3600 ]
262+
- [ Content-Length, 120000 ]
263+
content:
264+
size: 120000
265+
266+
proxy-response:
267+
headers:
268+
fields:
269+
- [ X-Response, {value: 'sixth-response', as: equal } ]
270+
271+
272+
# Retrieve an item from the cache. /sixth-request should have been cached in
273+
# the previous transaction.
274+
- client-request:
275+
276+
# Give the above transaction enough time to finish.
277+
await: sixth-request
278+
279+
# Add some time to ensure that the sixth-request response is cached.
280+
delay: 100ms
281+
headers:
282+
fields:
283+
- [ :method, GET ]
284+
- [ :scheme, https ]
285+
- [ :authority, www.example.com ]
286+
- [ :path, /sixth-request ]
287+
- [ uuid, sixth-request-cached ]
288+
- [ X-Request, sixth-request-cached ]
289+
content:
290+
size: 0
291+
292+
# Configure an error response which we don't expect to receive from the
293+
# server because this should be served out of the cache.
294+
server-response:
295+
status: 500
296+
reason: Bad Request
297+
298+
proxy-response:
299+
status: 200
300+
headers:
301+
fields:
302+
- [ X-Response, {value: 'sixth-response', as: equal } ]

tests/gold_tests/h2/http2_flow_control.test.py

Lines changed: 75 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
# See the License for the specific language governing permissions and
1717
# limitations under the License.
1818

19-
import os
2019
import re
20+
from enum import Enum
2121
from typing import List, Optional
2222

2323

@@ -28,6 +28,7 @@ class Http2FlowControlTest:
2828
"""Define an object to test HTTP/2 flow control behavior."""
2929

3030
_replay_file: str = 'http2_flow_control.replay.yaml'
31+
_replay_chunked_file: str = 'http2_flow_control_chunked.replay.yaml'
3132
_valid_policy_values: List[int] = list(range(0, 3))
3233
_flow_control_policy: Optional[int] = None
3334
_flow_control_policy_is_malformed: bool = False
@@ -47,6 +48,13 @@ class Http2FlowControlTest:
4748
IS_HTTP2_TO_ORIGIN = True
4849
IS_HTTP1_TO_ORIGIN = False
4950

51+
class ServerType(Enum):
52+
"""Define the type of server to use in a TestRun."""
53+
54+
HTTP1_CONTENT_LENGTH = 0
55+
HTTP1_CHUNKED = 1
56+
HTTP2 = 2
57+
5058
def __init__(
5159
self,
5260
description: str,
@@ -100,20 +108,26 @@ def _configure_dns(self, tr: 'TestRun') -> 'Process':
100108
Http2FlowControlTest._dns_counter += 1
101109
return dns
102110

103-
def _configure_server(self, tr: 'TestRun') -> 'Process':
111+
def _configure_server(self, tr: 'TestRun',
112+
server_type: ServerType) -> 'Process':
104113
"""Configure the test server."""
114+
if server_type == self.ServerType.HTTP1_CHUNKED:
115+
replay_file = self._replay_chunked_file
116+
else:
117+
replay_file = self._replay_file
118+
105119
server = tr.AddVerifierServerProcess(
106120
f'server-{Http2FlowControlTest._server_counter}',
107-
self._replay_file)
121+
replay_file)
108122
Http2FlowControlTest._server_counter += 1
109123
return server
110124

111-
def _configure_trafficserver(self, tr: 'TestRun', is_outbound: bool, is_http2_to_orign: bool) -> 'Process':
125+
def _configure_trafficserver(self, tr: 'TestRun', is_outbound: bool,
126+
server_type: ServerType) -> 'Process':
112127
"""Configure a Traffic Server process."""
113128
ts = tr.MakeATSProcess(
114129
f'ts-{Http2FlowControlTest._ts_counter}',
115-
enable_tls=True,
116-
enable_cache=False)
130+
enable_tls=True)
117131
Http2FlowControlTest._ts_counter += 1
118132

119133
ts.addDefaultSSLFiles()
@@ -122,12 +136,13 @@ def _configure_trafficserver(self, tr: 'TestRun', is_outbound: bool, is_http2_to
122136
'proxy.config.ssl.server.private_key.path': f'{ts.Variables.SSLDir}',
123137
'proxy.config.ssl.client.verify.server.policy': 'PERMISSIVE',
124138
'proxy.config.dns.nameservers': '127.0.0.1:{0}'.format(self._dns.Variables.Port),
139+
'proxy.config.http.insert_age_in_response': 0,
125140

126141
'proxy.config.diags.debug.enabled': 3,
127142
'proxy.config.diags.debug.tags': 'http',
128143
})
129144

130-
if is_http2_to_orign:
145+
if server_type == self.ServerType.HTTP2:
131146
ts.Disk.records_config.update({
132147
'proxy.config.ssl.client.alpn_protocols': 'h2,http/1.1',
133148
})
@@ -174,11 +189,11 @@ def _configure_trafficserver(self, tr: 'TestRun', is_outbound: bool, is_http2_to
174189
configuration = 'proxy.config.http2.flow_control.policy_in'
175190
ts.Disk.diags_log.Content = Testers.ContainsExpression(
176191
f"ERROR.*{configuration}",
177-
"There should be an error about an invalid flow control policy.")
192+
"Expected an error about an invalid flow control policy.")
178193

179194
return ts
180195

181-
def _configure_client(self, tr):
196+
def _configure_client(self, tr, ):
182197
"""Configure a client process.
183198
184199
:param tr: The TestRun to associate the client with.
@@ -251,25 +266,35 @@ def _configure_log_expectations(self, host):
251266
stream_window_1 = session_window_size
252267
stream_window_2 = int(session_window_size / 2)
253268
stream_window_3 = int(session_window_size / 3)
254-
host.Streams.stdout += Testers.ContainsExpression(
255-
(f'INITIAL_WINDOW_SIZE:{stream_window_1}.*'
256-
f'INITIAL_WINDOW_SIZE:{stream_window_2}.*'
257-
f'INITIAL_WINDOW_SIZE:{stream_window_3}'),
258-
f"{hostname} should stream receive window updates",
259-
reflags=re.DOTALL | re.MULTILINE)
269+
if self._server:
270+
# Toward the server, there is a potential race condition
271+
# between sending of first-request and the sending of the
272+
# SETTINGS frame which reduces the stream window size.
273+
# Allow for either scenario.
274+
host.Streams.stdout += Testers.ContainsExpression(
275+
(f'INITIAL_WINDOW_SIZE:{stream_window_1}.*'
276+
f'INITIAL_WINDOW_SIZE:{stream_window_2}.*'),
277+
f"{hostname} should stream receive window updates",
278+
reflags=re.DOTALL | re.MULTILINE)
279+
else:
280+
host.Streams.stdout += Testers.ContainsExpression(
281+
(f'INITIAL_WINDOW_SIZE:{stream_window_1}.*'
282+
f'INITIAL_WINDOW_SIZE:{stream_window_2}.*'
283+
f'INITIAL_WINDOW_SIZE:{stream_window_3}'),
284+
f"{hostname} should stream receive window updates",
285+
reflags=re.DOTALL | re.MULTILINE)
260286

261287
if self._expected_initial_stream_window_size < 1000:
262288
first_id = 5 if self._server else 3
263289

264-
# WINDOW_UPDATE timing is different between the server and the
265-
# client. Toward the origin we send SETTINGS frames to update the
266-
# INITIAL_WINDOW_SIZE with the headers so they are received earlier
267-
# than with the client, wherein we send the updated
268-
# INITIAL_WINDOW_SIZE after receiving headers from the client.
269290
if self._server and self._expected_flow_control_policy == 2:
270-
window_update_size = 33
291+
# Toward the server, there is a potential race condition
292+
# between sending of first-request and the sending of the
293+
# SETTINGS frame which reduces the stream window size. Allow
294+
# for either scenario.
295+
window_update_size = f'33|{self._expected_initial_stream_window_size}'
271296
else:
272-
window_update_size = self._expected_initial_stream_window_size
297+
window_update_size = f'{self._expected_initial_stream_window_size}'
273298
# For the smaller session window sizes, we expect WINDOW_UPDATE frames.
274299
host.Streams.stdout += Testers.ContainsExpression(
275300
f'WINDOW_UPDATE.*id {first_id}: {window_update_size}',
@@ -283,11 +308,12 @@ def _configure_log_expectations(self, host):
283308
f'WINDOW_UPDATE.*id {first_id + 4}: {window_update_size}',
284309
f"{hostname} should receive a stream WINDOW_UPDATE.")
285310

286-
def _configure_test_run_common(self, tr, is_outbound: bool, is_http2_to_origin: bool):
311+
def _configure_test_run_common(self, tr, is_outbound: bool,
312+
server_type: ServerType) -> None:
287313
"""Perform the common Process configuration."""
288314
self._dns = self._configure_dns(tr)
289-
self._server = self._configure_server(tr)
290-
self._ts = self._configure_trafficserver(tr, is_outbound, is_http2_to_origin)
315+
self._server = self._configure_server(tr, server_type)
316+
self._ts = self._configure_trafficserver(tr, is_outbound, server_type)
291317
if not self._flow_control_policy_is_malformed:
292318
self._configure_client(tr)
293319
tr.Processes.Default.StartBefore(self._dns)
@@ -297,25 +323,36 @@ def _configure_test_run_common(self, tr, is_outbound: bool, is_http2_to_origin:
297323
tr.Processes.Default.StartBefore(self._ts)
298324
tr.TimeOut = 20
299325

300-
def _configure_inbound_http1_to_origin_test_run(self):
326+
def _configure_inbound_http1_to_origin_test_run(self) -> None:
301327
"""Configure the TestRun for inbound stream configuration."""
302-
tr = Test.AddTestRun(f'{self._description} - inbound')
303-
self._configure_test_run_common(tr, self.IS_INBOUND, self.IS_HTTP1_TO_ORIGIN)
328+
tr = Test.AddTestRun(f'{self._description} - inbound, '
329+
'HTTP/1 Content-Length origin')
330+
self._configure_test_run_common(tr, self.IS_INBOUND,
331+
self.ServerType.HTTP1_CONTENT_LENGTH)
332+
self._configure_log_expectations(tr.Processes.Default)
333+
334+
tr = Test.AddTestRun(f'{self._description} - inbound, '
335+
'HTTP/1 chunked origin')
336+
self._configure_test_run_common(tr, self.IS_INBOUND,
337+
self.ServerType.HTTP1_CHUNKED)
304338
self._configure_log_expectations(tr.Processes.Default)
305339

306-
def _configure_inbound_http2_to_origin_test_run(self):
340+
def _configure_inbound_http2_to_origin_test_run(self) -> None:
307341
"""Configure the TestRun for inbound stream configuration."""
308-
tr = Test.AddTestRun(f'{self._description} - inbound')
309-
self._configure_test_run_common(tr, self.IS_INBOUND, self.IS_HTTP2_TO_ORIGIN)
342+
tr = Test.AddTestRun(f'{self._description} - inbound, HTTP/2 origin')
343+
self._configure_test_run_common(tr, self.IS_INBOUND,
344+
self.ServerType.HTTP2)
310345
self._configure_log_expectations(tr.Processes.Default)
311346

312-
def _configure_outbound_test_run(self):
347+
def _configure_outbound_test_run(self) -> None:
313348
"""Configure the TestRun outbound stream configuration."""
314-
tr = Test.AddTestRun(f'{self._description} - outbound')
315-
self._configure_test_run_common(tr, self.IS_OUTBOUND, self.IS_HTTP2_TO_ORIGIN)
349+
tr = Test.AddTestRun(f'{self._description} - outbound, HTTP/2 origin')
350+
self._configure_test_run_common(tr, self.IS_OUTBOUND,
351+
self.ServerType.HTTP2)
316352
self._configure_log_expectations(self._server)
317353

318-
def run(self):
354+
def run(self) -> None:
355+
"""Configure the test run for various origin side configurations."""
319356
self._configure_inbound_http1_to_origin_test_run()
320357
self._configure_inbound_http2_to_origin_test_run()
321358
self._configure_outbound_test_run()
@@ -352,18 +389,18 @@ def run(self):
352389
test.run()
353390

354391
test = Http2FlowControlTest(
355-
description="Flow control policy 0 (default): small initial_window_size_(in|out)",
392+
description="Flow control policy 0 (default): small initial_window_size",
356393
initial_window_size=500, # The default is 65 KB.
357394
flow_control_policy=0)
358395
test.run()
359396
test = Http2FlowControlTest(
360-
description="Flow control policy 1: 100 byte session, 10 byte stream windows",
397+
description="Flow control policy 1: 100 byte session, 10 byte streams",
361398
max_concurrent_streams=10,
362399
initial_window_size=10,
363400
flow_control_policy=1)
364401
test.run()
365402
test = Http2FlowControlTest(
366-
description="Flow control policy 2: 100 byte session, dynamic stream windows",
403+
description="Flow control policy 2: 100 byte session, dynamic streams",
367404
max_concurrent_streams=10,
368405
initial_window_size=10,
369406
flow_control_policy=2)

0 commit comments

Comments
 (0)