Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add closeOnDeinit to the NIOAsyncChannel init #2592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func runTCPEchoAsyncChannel(numberOfWrites: Int, eventLoop: EventLoop) async thr
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
Expand All @@ -39,7 +39,7 @@ func runTCPEchoAsyncChannel(numberOfWrites: Int, eventLoop: EventLoop) async thr
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
Expand Down
101 changes: 94 additions & 7 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,55 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)
}

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel`` where the outbound type is `Never`.
///
/// This initializer will finish the ``NIOAsyncChannel/outbound`` immediately.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)

self._outbound.finish()
}

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel``.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
@inlinable
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
Expand All @@ -108,7 +157,8 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)
}

Expand All @@ -123,6 +173,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
Expand All @@ -131,7 +182,8 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)

self._outbound.finish()
Expand All @@ -149,12 +201,12 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self._outbound = outboundWriter
}


/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
@available(*, deprecated, message: "This method has been deprecated since it defaults to deinit based resource teardown")
public static func _wrapAsyncChannelWithTransformations(
synchronouslyWrapping channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
Expand All @@ -165,6 +217,35 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: true,
channelReadTransformation: channelReadTransformation
)

outboundWriter.finish()

return .init(
channel: channel,
inboundStream: inboundStream,
outboundWriter: outboundWriter
)
}

/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
public static func _wrapAsyncChannelWithTransformations(
wrappingChannelSynchronously channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannel<Inbound, Outbound> where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: false,
channelReadTransformation: channelReadTransformation
)

Expand Down Expand Up @@ -229,17 +310,20 @@ extension Channel {
@inlinable
func _syncAddAsyncHandlers<Inbound: Sendable, Outbound: Sendable>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool
) throws -> (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) {
self.eventLoop.assertInEventLoop()

let inboundStream = try NIOAsyncChannelInboundStream<Inbound>.makeWrappingHandler(
channel: self,
backPressureStrategy: backPressureStrategy
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit
)
let writer = try NIOAsyncChannelOutboundWriter<Outbound>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
Expand All @@ -249,18 +333,21 @@ extension Channel {
func _syncAddAsyncHandlersWithTransformations<ChannelReadResult>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>
) throws -> (NIOAsyncChannelInboundStream<ChannelReadResult>, NIOAsyncChannelOutboundWriter<Never>) {
self.eventLoop.assertInEventLoop()

let inboundStream = try NIOAsyncChannelInboundStream<ChannelReadResult>.makeTransformationHandler(
channel: self,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
channelReadTransformation: channelReadTransformation
)
let writer = try NIOAsyncChannelOutboundWriter<Never>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
Expand Down
9 changes: 7 additions & 2 deletions Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
init<HandlerInbound: Sendable>(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool,
handler: NIOAsyncChannelInboundStreamChannelHandler<HandlerInbound, Inbound>
) throws {
channel.eventLoop.preconditionInEventLoop()
Expand All @@ -95,7 +96,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {

let sequence = Producer.makeSequence(
backPressureStrategy: strategy,
finishOnDeinit: false,
finishOnDeinit: closeOnDeinit,
delegate: NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate(handler: handler)
)
handler.source = sequence.source
Expand All @@ -107,7 +108,8 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
@inlinable
static func makeWrappingHandler(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound, Inbound>.makeHandler(
eventLoop: channel.eventLoop
Expand All @@ -116,6 +118,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
return try .init(
channel: channel,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
handler: handler
)
}
Expand All @@ -125,6 +128,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
static func makeTransformationHandler(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Channel, Inbound>.makeHandlerWithTransformations(
Expand All @@ -135,6 +139,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
return try .init(
channel: channel,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
handler: handler
)
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
@inlinable
init(
channel: Channel,
isOutboundHalfClosureEnabled: Bool
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool
) throws {
let handler = NIOAsyncChannelOutboundWriterHandler<OutboundOut>(
eventLoop: channel.eventLoop,
Expand All @@ -93,7 +94,7 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
let writer = _Writer.makeWriter(
elementType: OutboundOut.self,
isWritable: true,
finishOnDeinit: false,
finishOnDeinit: closeOnDeinit,
delegate: .init(handler: handler)
)
handler.sink = writer.sink
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOCore/Docs.docc/swift-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ the inbound data and echo it back outbound.

```swift
let channel = ...
let asyncChannel = try NIOAsyncChannel<ByteBuffer, ByteBuffer>(synchronouslyWrapping: channel)
let asyncChannel = try NIOAsyncChannel<ByteBuffer, ByteBuffer>(wrappingChannelSynchronously: channel)

try await asyncChannel.executeThenClose { inbound, outbound in
for try await inboundData in inbound {
Expand Down Expand Up @@ -186,7 +186,7 @@ let clientChannel = try await ClientBootstrap(group: eventLoopGroup)
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel<ByteBuffer, ByteBuffer>(
synchronouslyWrapping: channel
wrappingChannelSynchronously: channel
)
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ let upgradeResult: EventLoopFuture<UpgradeResult> = try await ClientBootstrap(gr
// This configures the pipeline after the websocket upgrade was successful.
// We are wrapping the pipeline in a NIOAsyncChannel.
channel.eventLoop.makeCompletedFuture {
let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
return UpgradeResult.websocket(asyncChannel)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOPosix/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ extension ServerBootstrap {
)
let asyncChannel = try NIOAsyncChannel<ChannelInitializerResult, Never>
._wrapAsyncChannelWithTransformations(
synchronouslyWrapping: serverChannel,
wrappingChannelSynchronously: serverChannel,
backPressureStrategy: serverBackPressureStrategy,
channelReadTransformation: { channel -> EventLoopFuture<ChannelInitializerResult> in
// The channelReadTransformation is run on the EL of the server channel
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOTCPEchoClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Client {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))

return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
inboundType: String.self,
outboundType: String.self
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOTCPEchoServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct Server {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))

return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
inboundType: String.self,
outboundType: String.self
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOWebSocketClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct Client {
// let upgrader = NIOTypedWebSocketClientUpgrader<UpgradeResult>(
// upgradePipelineHandler: { (channel, _) in
// channel.eventLoop.makeCompletedFuture {
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
// return UpgradeResult.websocket(asyncChannel)
// }
// }
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOWebSocketServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct Server {
// },
// upgradePipelineHandler: { (channel, _) in
// channel.eventLoop.makeCompletedFuture {
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
// return UpgradeResult.websocket(asyncChannel)
// }
// }
Expand All @@ -102,7 +102,7 @@ struct Server {
// notUpgradingCompletionHandler: { channel in
// channel.eventLoop.makeCompletedFuture {
// try channel.pipeline.syncOperations.addHandler(HTTPByteBufferResponsePartHandler())
// let asyncChannel = try NIOAsyncChannel<HTTPServerRequestPart, HTTPPart<HTTPResponseHead, ByteBuffer>>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<HTTPServerRequestPart, HTTPPart<HTTPResponseHead, ByteBuffer>>(wrappingChannelSynchronously: channel)
// return UpgradeResult.notUpgraded(asyncChannel)
// }
// }
Expand Down
Loading