diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index cdb11bdb..30d65e2c 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -113,6 +113,9 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { /// The maximum number of sequential CONTINUATION frames. private let maximumSequentialContinuationFrames: Int + /// A delegate which is told about frames which have been written. + private let frameDelegate: NIOHTTP2FrameDelegate? + @usableFromInline internal var inboundStreamMultiplexer: InboundStreamMultiplexer? { self.inboundStreamMultiplexerState.multiplexer @@ -242,7 +245,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { maximumResetFrameCount: 200, resetFrameCounterWindow: .seconds(30), maximumStreamErrorCount: 200, - streamErrorCounterWindow: .seconds(30) + streamErrorCounterWindow: .seconds(30), + frameDelegate: nil ) } @@ -280,7 +284,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { maximumResetFrameCount: 200, resetFrameCounterWindow: .seconds(30), maximumStreamErrorCount: 200, - streamErrorCounterWindow: .seconds(30) + streamErrorCounterWindow: .seconds(30), + frameDelegate: nil ) } @@ -295,6 +300,27 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { mode: ParserMode, connectionConfiguration: ConnectionConfiguration = .init(), streamConfiguration: StreamConfiguration = .init() + ) { + self.init( + mode: mode, + frameDelegate: nil, + connectionConfiguration: connectionConfiguration, + streamConfiguration: streamConfiguration + ) + } + + /// Constructs a ``NIOHTTP2Handler``. + /// + /// - Parameters: + /// - mode: The mode for this handler, client or server. + /// - frameDelegate: A delegate which is notified about frames being written. + /// - connectionConfiguration: The settings that will be used when establishing the connection. + /// - streamConfiguration: The settings that will be used when establishing new streams. + public convenience init( + mode: ParserMode, + frameDelegate: NIOHTTP2FrameDelegate?, + connectionConfiguration: ConnectionConfiguration = ConnectionConfiguration(), + streamConfiguration: StreamConfiguration = StreamConfiguration() ) { self.init( mode: mode, @@ -310,7 +336,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { maximumResetFrameCount: streamConfiguration.streamResetFrameRateLimit.maximumCount, resetFrameCounterWindow: streamConfiguration.streamResetFrameRateLimit.windowLength, maximumStreamErrorCount: streamConfiguration.streamErrorRateLimit.maximumCount, - streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength + streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength, + frameDelegate: frameDelegate ) } @@ -328,7 +355,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { maximumResetFrameCount: Int, resetFrameCounterWindow: TimeAmount, maximumStreamErrorCount: Int, - streamErrorCounterWindow: TimeAmount + streamErrorCounterWindow: TimeAmount, + frameDelegate: NIOHTTP2FrameDelegate? ) { self._eventLoop = eventLoop self.stateMachine = HTTP2ConnectionStateMachine( @@ -355,6 +383,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { self.inboundStreamMultiplexerState = .uninitializedLegacy self.maximumSequentialContinuationFrames = maximumSequentialContinuationFrames self.glitchesMonitor = GlitchesMonitor(maximumGlitches: maximumConnectionGlitches) + self.frameDelegate = frameDelegate } /// Constructs a ``NIOHTTP2Handler``. @@ -391,7 +420,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { resetFrameCounterWindow: TimeAmount = .seconds(30), maximumStreamErrorCount: Int = 200, streamErrorCounterWindow: TimeAmount = .seconds(30), - maximumConnectionGlitches: Int = GlitchesMonitor.defaultMaximumGlitches + maximumConnectionGlitches: Int = GlitchesMonitor.defaultMaximumGlitches, + frameDelegate: NIOHTTP2FrameDelegate? = nil ) { self.stateMachine = HTTP2ConnectionStateMachine( role: .init(mode), @@ -418,6 +448,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler { self.inboundStreamMultiplexerState = .uninitializedLegacy self.maximumSequentialContinuationFrames = maximumSequentialContinuationFrames self.glitchesMonitor = GlitchesMonitor(maximumGlitches: maximumConnectionGlitches) + self.frameDelegate = frameDelegate } public func handlerAdded(context: ChannelHandlerContext) { @@ -1067,6 +1098,11 @@ extension NIOHTTP2Handler { return } + // Tell the delegate, if there is one. + if let delegate = self.frameDelegate { + delegate.wroteFrame(frame) + } + // Ok, if we got here we're good to send data. We want to attach the promise to the latest write, not // always the frame header. self.wroteFrame = true @@ -1391,7 +1427,8 @@ extension NIOHTTP2Handler { maximumResetFrameCount: streamConfiguration.streamResetFrameRateLimit.maximumCount, resetFrameCounterWindow: streamConfiguration.streamResetFrameRateLimit.windowLength, maximumStreamErrorCount: streamConfiguration.streamErrorRateLimit.maximumCount, - streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength + streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength, + frameDelegate: nil ) self.inboundStreamMultiplexerState = .uninitializedInline( @@ -1408,6 +1445,7 @@ extension NIOHTTP2Handler { connectionConfiguration: ConnectionConfiguration = .init(), streamConfiguration: StreamConfiguration = .init(), streamDelegate: NIOHTTP2StreamDelegate? = nil, + frameDelegate: NIOHTTP2FrameDelegate?, inboundStreamInitializerWithAnyOutput: @escaping StreamInitializerWithAnyOutput ) { self.init( @@ -1424,7 +1462,8 @@ extension NIOHTTP2Handler { maximumResetFrameCount: streamConfiguration.streamResetFrameRateLimit.maximumCount, resetFrameCounterWindow: streamConfiguration.streamResetFrameRateLimit.windowLength, maximumStreamErrorCount: streamConfiguration.streamErrorRateLimit.maximumCount, - streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength + streamErrorCounterWindow: streamConfiguration.streamErrorRateLimit.windowLength, + frameDelegate: frameDelegate ) self.inboundStreamMultiplexerState = .uninitializedAsync( streamConfiguration, diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index 8bdd7160..12c3b012 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -866,6 +866,42 @@ extension ChannelPipeline.SynchronousOperations { streamDelegate: NIOHTTP2StreamDelegate?, configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(), streamInitializer: @escaping NIOChannelInitializerWithOutput + ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { + try self.configureAsyncHTTP2Pipeline( + mode: mode, + streamDelegate: streamDelegate, + frameDelegate: nil, + configuration: configuration, + streamInitializer: streamInitializer + ) + } + + /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. + /// + /// This operation **must** be called on the event loop. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, + /// as it allows that pipeline to evolve without breaking your code. + /// + /// - Parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - streamDelegate: A delegate which is called when streams are created and closed. + /// - frameDelegate: A delegate which is called when frames are written to the network. + /// - configuration: The settings that will be used when establishing the connection and new streams. + /// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + @inlinable + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + streamDelegate: NIOHTTP2StreamDelegate?, + frameDelegate: NIOHTTP2FrameDelegate?, + configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(), + streamInitializer: @escaping NIOChannelInitializerWithOutput ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { let handler = NIOHTTP2Handler( mode: mode, @@ -873,6 +909,7 @@ extension ChannelPipeline.SynchronousOperations { connectionConfiguration: configuration.connection, streamConfiguration: configuration.stream, streamDelegate: streamDelegate, + frameDelegate: frameDelegate, inboundStreamInitializerWithAnyOutput: { channel in streamInitializer(channel).map { $0 } } diff --git a/Sources/NIOHTTP2/NIOHTTP2FrameDelegate.swift b/Sources/NIOHTTP2/NIOHTTP2FrameDelegate.swift new file mode 100644 index 00000000..2203bf7c --- /dev/null +++ b/Sources/NIOHTTP2/NIOHTTP2FrameDelegate.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOHPACK + +/// A delegate which can be used with the ``NIOHTTP2Handler`` which is notified +/// when various frame types are written into the connection channel. +/// +/// This delegate, when used by the ``NIOHTTP2Handler`` will be called on the event +/// loop associated with the channel that the handler is a part of. As such you should +/// avoid doing expensive or blocking work in this delegate. +public protocol NIOHTTP2FrameDelegate { + /// Called when a frame is written by the connection channel. + /// + /// - Parameters: + /// - frame: The frame to write. + func wroteFrame(_ frame: HTTP2Frame) +} diff --git a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift index ece33c4b..0a0c7bf3 100644 --- a/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift +++ b/Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift @@ -210,8 +210,10 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { serverSettings: HTTP2Settings = nioDefaultSettings, maximumBufferedControlFrames: Int = 10000, maximumSequentialContinuationFrames: Int = 5, - withMultiplexerCallback multiplexerCallback: NIOChannelInitializer? = nil, - maximumConnectionGlitches: Int = 10 + maximumConnectionGlitches: Int = 10, + clientFrameDelegate: NIOHTTP2FrameDelegate? = nil, + serverFrameDelegate: NIOHTTP2FrameDelegate? = nil, + withMultiplexerCallback multiplexerCallback: NIOChannelInitializer? = nil ) throws { XCTAssertNoThrow( try self.clientChannel.pipeline.syncOperations.addHandler( @@ -219,7 +221,8 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { mode: .client, initialSettings: clientSettings, maximumBufferedControlFrames: maximumBufferedControlFrames, - maximumSequentialContinuationFrames: maximumSequentialContinuationFrames + maximumSequentialContinuationFrames: maximumSequentialContinuationFrames, + frameDelegate: clientFrameDelegate ) ) ) @@ -230,7 +233,8 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { initialSettings: serverSettings, maximumBufferedControlFrames: maximumBufferedControlFrames, maximumSequentialContinuationFrames: maximumSequentialContinuationFrames, - maximumConnectionGlitches: maximumConnectionGlitches + maximumConnectionGlitches: maximumConnectionGlitches, + frameDelegate: serverFrameDelegate ) ) ) @@ -2844,6 +2848,98 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase { XCTAssertNoThrow(try self.clientChannel.finish()) XCTAssertNoThrow(try self.serverChannel.finish()) } + + func testFrameDelegateIsCalled() throws { + // Cap the frame size to 2^14 = 16_384 + let settings = [HTTP2Setting(parameter: .maxFrameSize, value: 1 << 14)] + + // Configure the client channel. + let delegate = RecordingFrameDelegate() + + try self.basicHTTP2Connection( + clientSettings: settings, + serverSettings: settings, + clientFrameDelegate: delegate + ) { stream in + stream.eventLoop.makeCompletedFuture { + try stream.pipeline.syncOperations.addHandler(OkHandler()) + } + } + + let multiplexer = try self.clientChannel.pipeline.handler( + type: HTTP2StreamMultiplexer.self + ).map { + $0.sendableView + }.wait() + + let streamPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self) + multiplexer.createStreamChannel(promise: streamPromise) { + $0.eventLoop.makeSucceededVoidFuture() + } + self.clientChannel.embeddedEventLoop.run() + let stream = try streamPromise.futureResult.wait() + + // Write a request. + let headers = HTTP2Frame.FramePayload.Headers( + headers: [":scheme": "http", ":path": "/", ":method": "GET"] + ) + stream.write(HTTP2Frame.FramePayload.headers(headers), promise: nil) + + // Write a frame which is four times the max frame size. This demonstrates that + // the delegate is called with the frame as written to the network rather than that + // as written to the stream channel. + let bytes = ByteBuffer(repeating: 42, count: 1 << 16) + let data = HTTP2Frame.FramePayload.Data(data: .byteBuffer(bytes), endStream: true) + stream.write(HTTP2Frame.FramePayload.data(data), promise: nil) + stream.flush() + + self.interactInMemory(self.clientChannel, self.serverChannel) + + self.clientChannel.embeddedEventLoop.run() + try stream.closeFuture.wait() + + var events = delegate.events + XCTAssertEqual(events.count, 8) + + // First the server writes its own SETTINGS. + events.popFirst()?.assertSettingsFrame(expectedSettings: settings, ack: false) + // Then it acks the client SETTINGS. + events.popFirst()?.assertSettingsFrame(expectedSettings: [], ack: true) + + // Then writes HEADERS on stream 1 + events.popFirst()?.assertHeadersFrame( + endStream: false, + streamID: 1, + headers: headers.headers + ) + + // Now 5 DATA frames on stream 1. + // + // The max frame size is set to 2^14 and the initial connection window + // size is (2^16)-1, so the write of size 2^16 is split up over five frames. + // The first three are the max frame size of 2^14, the fourth is (2^14) - 1 + // as that's all that remains of the connection window. The final byte of the + // message is sent later, after the server sends a WINDOW_UPDATE frame. + for _ in 1...3 { + events.popFirst()?.assertDataFrame( + endStream: false, + streamID: 1, + payload: ByteBuffer(repeating: 42, count: 1 << 14) + ) + } + events.popFirst()?.assertDataFrame( + endStream: false, + streamID: 1, + payload: ByteBuffer(repeating: 42, count: (1 << 14) - 1) + ) + events.popFirst()?.assertDataFrame( + endStream: true, + streamID: 1, + payload: ByteBuffer(repeating: 42, count: 1) + ) + + XCTAssertNil(events.popFirst()) + } } final class ShouldQuiesceEventWaiter: ChannelInboundHandler, Sendable { @@ -2867,3 +2963,52 @@ final class ShouldQuiesceEventWaiter: ChannelInboundHandler, Sendable { context.fireUserInboundEventTriggered(event) } } + +final class OkHandler: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame.FramePayload + typealias OutboundOut = HTTP2Frame.FramePayload + + func errorCaught(context: ChannelHandlerContext, error: any Error) { + context.close(mode: .all, promise: nil) + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch Self.unwrapInboundIn(data) { + case .headers(let headers): + let responseHeaders = HTTP2Frame.FramePayload.Headers( + headers: [":status": "200"], + endStream: headers.endStream + ) + context.write(Self.wrapOutboundOut(.headers(responseHeaders)), promise: nil) + + case .data(let data): + if data.endStream { + let data = HTTP2Frame.FramePayload.Data( + data: .byteBuffer(ByteBuffer()), + endStream: true + ) + context.write(Self.wrapOutboundOut(.data(data)), promise: nil) + } + + default: + () // Ignore + } + } + + func channelReadComplete(context: ChannelHandlerContext) { + context.flush() + context.fireChannelReadComplete() + } +} + +final class RecordingFrameDelegate: NIOHTTP2FrameDelegate { + private(set) var events: CircularBuffer + + init() { + self.events = [] + } + + func wroteFrame(_ frame: HTTP2Frame) { + self.events.append(frame) + } +}