Skip to content

Commit b7085b6

Browse files
Lukasaweissi
authored andcommitted
Correctly discard only inactive streams. (#11)
Motivation: Long-lived connections that have many inactive streams would cause crashes. Crashes are, in my professional opinion, less than ideal when they occur under expected usage cases. Modifications: - Fixed an inverted boolean check that would discard only *active* streams. - Made the cached stream count configurable, removing a TODO. - Added tests. Result: Fewer crashes, better software.
1 parent 80963ee commit b7085b6

File tree

4 files changed

+106
-6
lines changed

4 files changed

+106
-6
lines changed

Sources/NIOHTTP2/HTTP2Parser.swift

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,27 @@ public final class HTTP2Parser: ChannelInboundHandler, ChannelOutboundHandler {
156156
private let mode: ParserMode
157157
private let initialSettings: [HTTP2Setting]
158158
private let reentrancyManager = ReentrancyManager()
159+
private let maxCachedClosedStreams: Int
159160

160-
public init(mode: ParserMode, initialSettings: [HTTP2Setting] = nioDefaultSettings) {
161+
/// Create a `HTTP2Parser`.
162+
///
163+
/// - parameters:
164+
/// - mode: The mode for the parser to operate in: server or client.
165+
/// - initialSettings: The initial settings used for the connection, to be sent in the
166+
/// connection preamble.
167+
/// - maxCachedClosedStreams: The maximum number of streams for which metadata will be preserved
168+
/// to handle delayed frames (e.g. DATA frames that were already in flight after stream reset
169+
/// or GOAWAY).
170+
public init(mode: ParserMode, initialSettings: [HTTP2Setting] = nioDefaultSettings, maxCachedClosedStreams: Int = 1024) {
161171
self.mode = mode
162172
self.initialSettings = initialSettings
173+
self.maxCachedClosedStreams = maxCachedClosedStreams
163174
}
164175

165176
public func channelActive(ctx: ChannelHandlerContext) {
166177
self.session = NGHTTP2Session(mode: self.mode,
167178
allocator: ctx.channel.allocator,
168-
maxCachedStreamIDs: 1024, // TODO(cory): Make configurable
179+
maxCachedStreamIDs: self.maxCachedClosedStreams,
169180
frameReceivedHandler: { ctx.fireChannelRead(self.wrapInboundOut($0)) },
170181
sendFunction: { ctx.write(self.wrapOutboundOut($0), promise: $1) },
171182
userEventFunction: { ctx.fireUserInboundEventTriggered($0) })

Sources/NIOHTTP2/NGHTTP2Session.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ fileprivate struct StreamManager {
285285
// Discard old and unnecessary streams.
286286
private mutating func purgeOldStreams() {
287287
while self.streamMap.count >= maxSize {
288-
let lowestStreamID = self.streamMap.filter { $0.value.active }.keys.sorted().first { $0 != 0 && $0 != Int32.max }!
288+
let lowestStreamID = self.streamMap.filter { !$0.value.active }.keys.sorted().first { $0 != 0 && $0 != Int32.max }!
289289
self.streamMap.removeValue(forKey: lowestStreamID)
290290
}
291291
}

Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ extension SimpleClientServerTests {
5555
("testStreamClosedViaGoaway", testStreamClosedViaGoaway),
5656
("testStreamCloseEventForRstStreamFiresAfterFrame", testStreamCloseEventForRstStreamFiresAfterFrame),
5757
("testStreamCloseEventForGoawayFiresAfterFrame", testStreamCloseEventForGoawayFiresAfterFrame),
58+
("testManyConcurrentInactiveStreams", testManyConcurrentInactiveStreams),
59+
("testDontRemoveActiveStreams", testDontRemoveActiveStreams),
5860
]
5961
}
6062
}

Tests/NIOHTTP2Tests/SimpleClientServerTests.swift

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,9 @@ class SimpleClientServerTests: XCTestCase {
183183
}
184184

185185
/// Establish a basic HTTP/2 connection.
186-
func basicHTTP2Connection() throws {
187-
XCTAssertNoThrow(try self.clientChannel.pipeline.add(handler: HTTP2Parser(mode: .client)).wait())
188-
XCTAssertNoThrow(try self.serverChannel.pipeline.add(handler: HTTP2Parser(mode: .server)).wait())
186+
func basicHTTP2Connection(maxCachedClosedStreams: Int = 1024) throws {
187+
XCTAssertNoThrow(try self.clientChannel.pipeline.add(handler: HTTP2Parser(mode: .client, maxCachedClosedStreams: maxCachedClosedStreams)).wait())
188+
XCTAssertNoThrow(try self.serverChannel.pipeline.add(handler: HTTP2Parser(mode: .server, maxCachedClosedStreams: maxCachedClosedStreams)).wait())
189189
try self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)
190190
}
191191

@@ -1230,4 +1230,91 @@ class SimpleClientServerTests: XCTestCase {
12301230
XCTAssertNoThrow(try self.clientChannel.finish())
12311231
XCTAssertNoThrow(try self.serverChannel.finish())
12321232
}
1233+
1234+
func testManyConcurrentInactiveStreams() throws {
1235+
let maxCachedClosedStreams = 128
1236+
1237+
// Begin by getting the connection up.
1238+
try self.basicHTTP2Connection(maxCachedClosedStreams: maxCachedClosedStreams)
1239+
1240+
// Obtain some request data.
1241+
let requestHeaders = HTTPHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")])
1242+
var requestBody = self.clientChannel.allocator.buffer(capacity: 128)
1243+
requestBody.write(staticString: "A simple HTTP/2 request.")
1244+
let responseHeaders = HTTPHeaders([(":status", "200"), ("content-length", "0")])
1245+
1246+
// We're going to initiate and then close more than maxCachedClosedStreams streams.
1247+
// Nothing bad should happen here.
1248+
for _ in 0...maxCachedClosedStreams {
1249+
// We're now going to try to send a request from the client to the server.
1250+
let clientStreamID = HTTP2StreamID()
1251+
let reqFrame = HTTP2Frame(streamID: clientStreamID, payload: .headers(requestHeaders))
1252+
var reqBodyFrame = HTTP2Frame(streamID: clientStreamID, payload: .data(.byteBuffer(requestBody)))
1253+
reqBodyFrame.endStream = true
1254+
1255+
let serverStreamID = try self.assertFramesRoundTrip(frames: [reqFrame, reqBodyFrame], sender: self.clientChannel, receiver: self.serverChannel).first!.streamID
1256+
1257+
// Let's send a quick response back.
1258+
var respFrame = HTTP2Frame(streamID: serverStreamID, payload: .headers(responseHeaders))
1259+
respFrame.endStream = true
1260+
try self.assertFramesRoundTrip(frames: [respFrame], sender: self.serverChannel, receiver: self.clientChannel)
1261+
}
1262+
1263+
XCTAssertNoThrow(try self.clientChannel.finish())
1264+
XCTAssertNoThrow(try self.serverChannel.finish())
1265+
}
1266+
1267+
func testDontRemoveActiveStreams() throws {
1268+
// This is a bit of a regression test, not a generally useful one. See https://github.com/apple/swift-nio-http2/pull/11/
1269+
// for more.
1270+
let maxCachedClosedStreams = 128
1271+
1272+
// Begin by getting the connection up.
1273+
try self.basicHTTP2Connection(maxCachedClosedStreams: maxCachedClosedStreams)
1274+
1275+
// Obtain some request data.
1276+
let requestHeaders = HTTPHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")])
1277+
var requestBody = self.clientChannel.allocator.buffer(capacity: 128)
1278+
requestBody.write(staticString: "A simple HTTP/2 request.")
1279+
let responseHeaders = HTTPHeaders([(":status", "200"), ("content-length", "0")])
1280+
1281+
// We're going to initiate and then close one fewer than maxCachedClosedStreams streams.
1282+
// Nothing bad should happen here.
1283+
for _ in 0..<(maxCachedClosedStreams - 2) {
1284+
let clientStreamID = HTTP2StreamID()
1285+
let reqFrame = HTTP2Frame(streamID: clientStreamID, payload: .headers(requestHeaders))
1286+
var reqBodyFrame = HTTP2Frame(streamID: clientStreamID, payload: .data(.byteBuffer(requestBody)))
1287+
reqBodyFrame.endStream = true
1288+
1289+
let serverStreamID = try self.assertFramesRoundTrip(frames: [reqFrame, reqBodyFrame], sender: self.clientChannel, receiver: self.serverChannel).first!.streamID
1290+
1291+
// Let's send a quick response back.
1292+
var respFrame = HTTP2Frame(streamID: serverStreamID, payload: .headers(responseHeaders))
1293+
respFrame.endStream = true
1294+
try self.assertFramesRoundTrip(frames: [respFrame], sender: self.serverChannel, receiver: self.clientChannel)
1295+
}
1296+
1297+
// Ok, now we're going to open *two* streams. In the old, broken code, the opening of the second
1298+
// stream would discard the first *open* stream, instead of one of the dead old ones.
1299+
let clientStreamIDs = (0..<2).map { _ in HTTP2StreamID() }
1300+
let clientFrames = clientStreamIDs.map { HTTP2Frame(streamID: $0, payload: .headers(requestHeaders)) }
1301+
let serverStreamIDs = try self.assertFramesRoundTrip(frames: clientFrames, sender: self.clientChannel, receiver: self.serverChannel).map { $0.streamID }
1302+
1303+
// Now we're going to send the two data frames for these streams.
1304+
// In the old broken version of this code, this will fail because we accidentally deleted one of our two *open*
1305+
// stream states, instead of one of the 1024 closed ones. Booleans are hard.
1306+
let clientDataFrames = clientStreamIDs.map { HTTP2Frame(streamID: $0, payload: .data(.byteBuffer(requestBody))) }
1307+
try self.assertFramesRoundTrip(frames: clientDataFrames, sender: self.clientChannel, receiver: self.serverChannel)
1308+
1309+
// Clean it up with the server now.
1310+
let serverFrames = serverStreamIDs.map { streamID -> HTTP2Frame in
1311+
var respFrame = HTTP2Frame(streamID: streamID, payload: .headers(responseHeaders))
1312+
respFrame.endStream = true
1313+
return respFrame
1314+
}
1315+
try self.assertFramesRoundTrip(frames: serverFrames, sender: self.serverChannel, receiver: self.clientChannel)
1316+
1317+
XCTAssertNoThrow(try self.clientChannel.finish())
1318+
XCTAssertNoThrow(try self.serverChannel.finish())
1319+
}
12331320
}

0 commit comments

Comments
 (0)