-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
Http3LoopbackConnection.cs
338 lines (275 loc) · 14.8 KB
/
Http3LoopbackConnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Net.Quic;
using System.Text;
using System.Threading.Tasks;
using System.Linq;
using System.Net.Http.Functional.Tests;
using Xunit;
using System.Threading;
namespace System.Net.Test.Common
{
public sealed class Http3LoopbackConnection : GenericLoopbackConnection
{
public const long H3_NO_ERROR = 0x100;
public const long H3_GENERAL_PROTOCOL_ERROR = 0x101;
public const long H3_INTERNAL_ERROR = 0x102;
public const long H3_STREAM_CREATION_ERROR = 0x103;
public const long H3_CLOSED_CRITICAL_STREAM = 0x104;
public const long H3_FRAME_UNEXPECTED = 0x105;
public const long H3_FRAME_ERROR = 0x106;
public const long H3_EXCESSIVE_LOAD = 0x107;
public const long H3_ID_ERROR = 0x108;
public const long H3_SETTINGS_ERROR = 0x109;
public const long H3_MISSING_SETTINGS = 0x10a;
public const long H3_REQUEST_REJECTED = 0x10b;
public const long H3_REQUEST_CANCELLED = 0x10c;
public const long H3_REQUEST_INCOMPLETE = 0x10d;
public const long H3_CONNECT_ERROR = 0x10f;
public const long H3_VERSION_FALLBACK = 0x110;
private readonly QuicConnection _connection;
// Queue for holding streams we accepted before we managed to accept the control stream
private readonly Queue<QuicStream> _delayedStreams = new Queue<QuicStream>();
// This is specifically request streams, not control streams
private readonly Dictionary<int, Http3LoopbackStream> _openStreams = new Dictionary<int, Http3LoopbackStream>();
private Http3LoopbackStream _currentStream;
// We can't retrieve the stream ID after the stream is disposed, so store it separately
// Initialize it to -4 so that the firstInvalidStreamId calculation will work even if we never process a request
private long _currentStreamId = -4;
private Http3LoopbackStream _inboundControlStream; // Inbound control stream from client
private Http3LoopbackStream _outboundControlStream; // Our outbound control stream
public Http3LoopbackStream OutboundControlStream => _outboundControlStream ?? throw new Exception("Control stream has not been opened yet");
public Http3LoopbackStream InboundControlStream => _inboundControlStream ?? throw new Exception("Inbound control stream has not been accepted yet");
public Http3LoopbackConnection(QuicConnection connection)
{
_connection = connection;
}
public long MaxHeaderListSize { get; private set; } = -1;
public override async ValueTask DisposeAsync()
{
// Close any remaining request streams (but NOT control streams, as these should not be closed while the connection is open)
foreach (Http3LoopbackStream stream in _openStreams.Values)
{
await stream.DisposeAsync().ConfigureAwait(false);
}
foreach (QuicStream stream in _delayedStreams)
{
await stream.DisposeAsync().ConfigureAwait(false);
}
// Dispose the connection
// If we already waited for graceful shutdown from the client, then the connection is already closed and this will simply release the handle.
// If not, then this will silently abort the connection.
await _connection.DisposeAsync().ConfigureAwait(false);
// Dispose control streams so that we release their handles too.
if (_inboundControlStream is not null)
{
await _inboundControlStream.DisposeAsync().ConfigureAwait(false);
}
if (_outboundControlStream is not null)
{
await _outboundControlStream.DisposeAsync().ConfigureAwait(false);
}
}
public Task CloseAsync(long errorCode) => _connection.CloseAsync(errorCode).AsTask();
public async ValueTask<Http3LoopbackStream> OpenUnidirectionalStreamAsync()
{
return new Http3LoopbackStream(await _connection.OpenOutboundStreamAsync(QuicStreamType.Unidirectional).ConfigureAwait(false));
}
public async ValueTask<Http3LoopbackStream> OpenBidirectionalStreamAsync()
{
return new Http3LoopbackStream(await _connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional).ConfigureAwait(false));
}
public static int GetRequestId(QuicStream stream)
{
Debug.Assert(stream.CanRead && stream.CanWrite, "Stream must be a request stream.");
// TODO: QUIC streams can have IDs larger than int.MaxValue; update all our tests to use long rather than int.
return checked((int)stream.Id + 1);
}
public override Task InitializeConnectionAsync()
{
throw new NotImplementedException();
}
private Task EnsureControlStreamAcceptedAsync()
{
if (_inboundControlStream != null)
{
return Task.CompletedTask;
}
return EnsureControlStreamAcceptedInternalAsync();
async Task EnsureControlStreamAcceptedInternalAsync()
{
Http3LoopbackStream controlStream;
while (true)
{
QuicStream quicStream = await _connection.AcceptInboundStreamAsync().ConfigureAwait(false);
if (!quicStream.CanWrite)
{
// control stream accepted
controlStream = new Http3LoopbackStream(quicStream);
break;
}
// control streams are unidirectional, so this must be a request stream
// keep it for later and wait for another stream
_delayedStreams.Enqueue(quicStream);
}
long? streamType = await controlStream.ReadIntegerAsync().ConfigureAwait(false);
Assert.Equal(Http3LoopbackStream.ControlStream, streamType);
List<(long settingId, long settingValue)> settings = await controlStream.ReadSettingsAsync().ConfigureAwait(false);
(long settingId, long settingValue) = Assert.Single(settings);
Assert.Equal(Http3LoopbackStream.MaxHeaderListSize, settingId);
MaxHeaderListSize = settingValue;
_inboundControlStream = controlStream;
}
}
// This will automatically handle the control stream, including validating its contents
public async Task<Http3LoopbackStream> AcceptRequestStreamAsync()
{
await EnsureControlStreamAcceptedAsync().ConfigureAwait(false);
if (!_delayedStreams.TryDequeue(out QuicStream quicStream))
{
quicStream = await _connection.AcceptInboundStreamAsync().ConfigureAwait(false);
}
var stream = new Http3LoopbackStream(quicStream);
Assert.True(quicStream.CanWrite, "Expected writeable stream.");
_openStreams.Add(checked((int)quicStream.Id), stream);
_currentStream = stream;
_currentStreamId = quicStream.Id;
return stream;
}
public async Task<(Http3LoopbackStream clientControlStream, Http3LoopbackStream requestStream)> AcceptControlAndRequestStreamAsync()
{
Http3LoopbackStream requestStream = await AcceptRequestStreamAsync().ConfigureAwait(false);
Http3LoopbackStream controlStream = _inboundControlStream;
return (controlStream, requestStream);
}
public async Task EstablishControlStreamAsync(SettingsEntry[] settingsEntries)
{
_outboundControlStream = await OpenUnidirectionalStreamAsync().ConfigureAwait(false);
await _outboundControlStream.SendUnidirectionalStreamTypeAsync(Http3LoopbackStream.ControlStream).ConfigureAwait(false);
await _outboundControlStream.SendSettingsFrameAsync(settingsEntries).ConfigureAwait(false);
}
public async Task DisposeCurrentStream()
{
Assert.NotNull(_currentStream);
Assert.True(_currentStreamId >= 0);
await _currentStream.DisposeAsync().ConfigureAwait(false);
_openStreams.Remove((int)_currentStreamId);
_currentStream = null;
}
public override async Task<byte[]> ReadRequestBodyAsync()
{
return await _currentStream.ReadRequestBodyAsync().ConfigureAwait(false);
}
public override async Task<HttpRequestData> ReadRequestDataAsync(bool readBody = true)
{
Http3LoopbackStream stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
return await stream.ReadRequestDataAsync(readBody).ConfigureAwait(false);
}
public override async Task SendResponseAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "", bool isFinal = true)
{
await _currentStream.SendResponseAsync(statusCode, headers, content, isFinal).ConfigureAwait(false);
if (isFinal)
{
await DisposeCurrentStream().ConfigureAwait(false);
}
}
public override async Task SendResponseBodyAsync(byte[] content, bool isFinal = true)
{
await _currentStream.SendResponseBodyAsync(content, isFinal).ConfigureAwait(false);
if (isFinal)
{
await DisposeCurrentStream().ConfigureAwait(false);
}
}
public override Task SendResponseHeadersAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null)
{
return _currentStream.SendResponseHeadersAsync(statusCode, headers);
}
public override Task SendPartialResponseHeadersAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null)
{
return _currentStream.SendPartialResponseHeadersAsync(statusCode, headers);
}
public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
{
Http3LoopbackStream stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
HttpRequestData request = await stream.ReadRequestDataAsync().ConfigureAwait(false);
// We are about to close the connection, after we send the response.
// So, send a GOAWAY frame now so the client won't inadvertantly try to reuse the connection.
// Note that in HTTP3 (unlike HTTP2) there is no strict ordering between the GOAWAY and the response below;
// so the client may race in processing them and we need to handle this.
await _outboundControlStream.SendGoAwayFrameAsync(stream.StreamId + 4).ConfigureAwait(false);
await stream.SendResponseAsync(statusCode, headers, content).ConfigureAwait(false);
await WaitForClientDisconnectAsync().ConfigureAwait(false);
return request;
}
public async Task ShutdownAsync(bool failCurrentRequest = false)
{
try
{
long firstInvalidStreamId = failCurrentRequest ? _currentStreamId : _currentStreamId + 4;
await _outboundControlStream.SendGoAwayFrameAsync(firstInvalidStreamId).ConfigureAwait(false);
}
catch (QuicException abortException) when (abortException.QuicError == QuicError.ConnectionAborted && abortException.ApplicationErrorCode == H3_NO_ERROR)
{
// Client must have closed the connection already because the HttpClientHandler instance was disposed.
// So nothing to do.
return;
}
catch (OperationCanceledException)
{
// If the client is closing the connection at the same time we are trying to send the GOAWAY above,
// this can result in OperationCanceledException from QuicStream.WriteAsync.
// See https://github.com/dotnet/runtime/issues/58078
// I saw this consistently with GetAsync_EmptyResponseHeader_Success.
// To work around this, just eat the exception for now.
// Also, be aware of this issue as it will do weird things with OperationCanceledException and can
// make debugging this very confusing: https://github.com/dotnet/runtime/issues/58081
return;
}
await WaitForClientDisconnectAsync().ConfigureAwait(false);
}
// Wait for the client to close the connection, e.g. after we send a GOAWAY, or after the HttpClient is disposed.
public async Task WaitForClientDisconnectAsync(bool refuseNewRequests = true)
{
while (true)
{
Http3LoopbackStream stream;
try
{
stream = await AcceptRequestStreamAsync().ConfigureAwait(false);
if (!refuseNewRequests)
{
throw new Exception("Unexpected request stream received while waiting for client disconnect");
}
}
catch (QuicException abortException) when (abortException.QuicError == QuicError.ConnectionAborted && abortException.ApplicationErrorCode == H3_NO_ERROR)
{
break;
}
await using (stream)
{
stream.Abort(H3_REQUEST_REJECTED);
}
}
// The client's control stream should throw QuicConnectionAbortedException, indicating that it was
// aborted because the connection was closed (and was not explicitly closed or aborted prior to the connection being closed)
if (_inboundControlStream is not null)
{
QuicException ex = await Assert.ThrowsAsync<QuicException>(async () => await _inboundControlStream.ReadFrameAsync().ConfigureAwait(false));
Assert.Equal(QuicError.ConnectionAborted, ex.QuicError);
}
await CloseAsync(H3_NO_ERROR).ConfigureAwait(false);
}
public override async Task WaitForCancellationAsync(bool ignoreIncomingData = true)
{
await _currentStream.WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false);
}
public override Task WaitForCloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}