Skip to content

Commit

Permalink
Reintroduce Async Channel SPI
Browse files Browse the repository at this point in the history
We previously removed Async Channel SPI to allow a clean release without
adding dependency on NIO code which was subject to change, however that
code is now ready to be promoted to API.

Revert "Remove dependency on SPI (apple#419)"

This reverts commit 2140160.
  • Loading branch information
rnro committed Oct 11, 2023
1 parent d6f9a3b commit 03f1a82
Show file tree
Hide file tree
Showing 5 changed files with 830 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import NIOCore
@_spi(AsyncChannel) import NIOCore

internal struct InlineStreamMultiplexer {
private let context: ChannelHandlerContext
Expand Down Expand Up @@ -211,3 +211,36 @@ extension InlineStreamMultiplexer {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
///
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
/// and `IOData`.
///
/// Outbound stream channel objects are initialized upon creation using the supplied `streamStateInitializer` which returns a type
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

/// Create a stream channel initialized with the provided closure
public func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
}
}
12 changes: 12 additions & 0 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1180,4 +1180,16 @@ extension NIOHTTP2Handler {
throw NIOHTTP2Errors.missingMultiplexer()
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
}
}
121 changes: 121 additions & 0 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,124 @@ internal protocol AnyContinuation {
func finish()
func finish(throwing error: Error)
}


/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as a generic `AsyncSequence`.
/// They make use of generics to allow for wrapping the stream `Channel`s, for example as `NIOAsyncChannel`s or protocol negotiation objects.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = Output

private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator

init(_ iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
self.iterator = iterator
}

public mutating func next() async throws -> Output? {
try await self.iterator.next()
}
}

public typealias Element = Output

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self.asyncThrowingStream.makeAsyncIterator())
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels {
/// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which inbound HTTP2 stream channels are yielded..
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct Continuation: AnyContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation

internal init(
continuation: AsyncThrowingStream<Output, Error>.Continuation
) {
self.continuation = continuation
}

/// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`.
///
/// It takes channels as as `Any` type to allow wrapping by the stream initializer.
func yield(any: Any) {
let yieldResult = self.continuation.yield(any as! Output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
default:
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}


/// `initialize` creates a new `Continuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels<Output>, Continuation) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (.init(stream), Continuation(continuation: continuation))
}
}

#if swift(>=5.7)
// This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case
@available(*, unavailable)
extension NIOHTTP2InboundStreamChannels.AsyncIterator: Sendable {}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: Sendable where Output: Sendable {}
#else
// This wasn't marked as sendable in 5.6 however it should be fine
// https://forums.swift.org/t/so-is-asyncstream-sendable-or-not/53148/2
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels: @unchecked Sendable where Output: Sendable {}
#endif


#if swift(<5.9)
// this should be available in the std lib from 5.9 onwards
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension AsyncThrowingStream {
public static func makeStream(
of elementType: Element.Type = Element.self,
throwing failureType: Failure.Type = Failure.self,
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
return (stream: stream, continuation: continuation!)
}
}
#endif
Loading

0 comments on commit 03f1a82

Please sign in to comment.