Skip to content

Commit

Permalink
Add closeOnDeinit to the NIOAsyncChannel init (#2592)
Browse files Browse the repository at this point in the history
* Add `closeOnDeinit` to the `NIOAsyncChannel` init

# Motivation
In my previous PR, I already did the work to add `finishOnDeinit` configuration to the `NIOAsyncWriter` and `NIOAsyncSequenceProducer`. This PR also automatically migrated the `NIOAsyncChanell` to set the `finishOnDeinit = false`. This was intentional since we really want users to not use the deinit based cleanup; however, it also broke all current adopters of this API semantically and they might now run into the preconditions.

# Modification
This PR reverts the change in `NIOAsyncChannel` and does the usual deprecate + new init dance to provide users to configure this behaviour while still nudging them to check that this is really what they want.

# Result
Easier migration without semantically breaking current adopters of `NIOAsyncChannel`.

* Rename to `wrappingChannelSynchronously`
  • Loading branch information
FranzBusch authored Nov 15, 2023
1 parent 118de50 commit 1040927
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func runTCPEchoAsyncChannel(numberOfWrites: Int, eventLoop: EventLoop) async thr
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
Expand All @@ -39,7 +39,7 @@ func runTCPEchoAsyncChannel(numberOfWrites: Int, eventLoop: EventLoop) async thr
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
Expand Down
101 changes: 94 additions & 7 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,55 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)
}

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel`` where the outbound type is `Never`.
///
/// This initializer will finish the ``NIOAsyncChannel/outbound`` immediately.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
public init(
wrappingChannelSynchronously channel: Channel,
configuration: Configuration = .init()
) throws where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: false
)

self._outbound.finish()
}

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel``.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
@inlinable
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
Expand All @@ -108,7 +157,8 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)
}

Expand All @@ -123,6 +173,7 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// - channel: The ``Channel`` to wrap.
/// - configuration: The ``NIOAsyncChannel``s configuration.
@inlinable
@available(*, deprecated, renamed: "init(wrappingChannelSynchronously:configuration:)", message: "This method has been deprecated since it defaults to deinit based resource teardown")
public init(
synchronouslyWrapping channel: Channel,
configuration: Configuration = .init()
Expand All @@ -131,7 +182,8 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self.channel = channel
(self._inbound, self._outbound) = try channel._syncAddAsyncHandlers(
backPressureStrategy: configuration.backPressureStrategy,
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled,
closeOnDeinit: true
)

self._outbound.finish()
Expand All @@ -149,12 +201,12 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
self._outbound = outboundWriter
}


/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
@available(*, deprecated, message: "This method has been deprecated since it defaults to deinit based resource teardown")
public static func _wrapAsyncChannelWithTransformations(
synchronouslyWrapping channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
Expand All @@ -165,6 +217,35 @@ public struct NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: true,
channelReadTransformation: channelReadTransformation
)

outboundWriter.finish()

return .init(
channel: channel,
inboundStream: inboundStream,
outboundWriter: outboundWriter
)
}

/// This method is only used from our server bootstrap to allow us to run the child channel initializer
/// at the right moment.
///
/// - Important: This is not considered stable API and should not be used.
@inlinable
public static func _wrapAsyncChannelWithTransformations(
wrappingChannelSynchronously channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannel<Inbound, Outbound> where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backPressureStrategy: backPressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: false,
channelReadTransformation: channelReadTransformation
)

Expand Down Expand Up @@ -229,17 +310,20 @@ extension Channel {
@inlinable
func _syncAddAsyncHandlers<Inbound: Sendable, Outbound: Sendable>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool
) throws -> (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) {
self.eventLoop.assertInEventLoop()

let inboundStream = try NIOAsyncChannelInboundStream<Inbound>.makeWrappingHandler(
channel: self,
backPressureStrategy: backPressureStrategy
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit
)
let writer = try NIOAsyncChannelOutboundWriter<Outbound>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
Expand All @@ -249,18 +333,21 @@ extension Channel {
func _syncAddAsyncHandlersWithTransformations<ChannelReadResult>(
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>
) throws -> (NIOAsyncChannelInboundStream<ChannelReadResult>, NIOAsyncChannelOutboundWriter<Never>) {
self.eventLoop.assertInEventLoop()

let inboundStream = try NIOAsyncChannelInboundStream<ChannelReadResult>.makeTransformationHandler(
channel: self,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
channelReadTransformation: channelReadTransformation
)
let writer = try NIOAsyncChannelOutboundWriter<Never>(
channel: self,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
closeOnDeinit: closeOnDeinit
)
return (inboundStream, writer)
}
Expand Down
9 changes: 7 additions & 2 deletions Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
init<HandlerInbound: Sendable>(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool,
handler: NIOAsyncChannelInboundStreamChannelHandler<HandlerInbound, Inbound>
) throws {
channel.eventLoop.preconditionInEventLoop()
Expand All @@ -95,7 +96,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {

let sequence = Producer.makeSequence(
backPressureStrategy: strategy,
finishOnDeinit: false,
finishOnDeinit: closeOnDeinit,
delegate: NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate(handler: handler)
)
handler.source = sequence.source
Expand All @@ -107,7 +108,8 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
@inlinable
static func makeWrappingHandler(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound, Inbound>.makeHandler(
eventLoop: channel.eventLoop
Expand All @@ -116,6 +118,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
return try .init(
channel: channel,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
handler: handler
)
}
Expand All @@ -125,6 +128,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
static func makeTransformationHandler(
channel: Channel,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeOnDeinit: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Channel, Inbound>.makeHandlerWithTransformations(
Expand All @@ -135,6 +139,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
return try .init(
channel: channel,
backPressureStrategy: backPressureStrategy,
closeOnDeinit: closeOnDeinit,
handler: handler
)
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
@inlinable
init(
channel: Channel,
isOutboundHalfClosureEnabled: Bool
isOutboundHalfClosureEnabled: Bool,
closeOnDeinit: Bool
) throws {
let handler = NIOAsyncChannelOutboundWriterHandler<OutboundOut>(
eventLoop: channel.eventLoop,
Expand All @@ -93,7 +94,7 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
let writer = _Writer.makeWriter(
elementType: OutboundOut.self,
isWritable: true,
finishOnDeinit: false,
finishOnDeinit: closeOnDeinit,
delegate: .init(handler: handler)
)
handler.sink = writer.sink
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOCore/Docs.docc/swift-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ the inbound data and echo it back outbound.

```swift
let channel = ...
let asyncChannel = try NIOAsyncChannel<ByteBuffer, ByteBuffer>(synchronouslyWrapping: channel)
let asyncChannel = try NIOAsyncChannel<ByteBuffer, ByteBuffer>(wrappingChannelSynchronously: channel)

try await asyncChannel.executeThenClose { inbound, outbound in
for try await inboundData in inbound {
Expand Down Expand Up @@ -186,7 +186,7 @@ let clientChannel = try await ClientBootstrap(group: eventLoopGroup)
) { channel in
channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel<ByteBuffer, ByteBuffer>(
synchronouslyWrapping: channel
wrappingChannelSynchronously: channel
)
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ let upgradeResult: EventLoopFuture<UpgradeResult> = try await ClientBootstrap(gr
// This configures the pipeline after the websocket upgrade was successful.
// We are wrapping the pipeline in a NIOAsyncChannel.
channel.eventLoop.makeCompletedFuture {
let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
return UpgradeResult.websocket(asyncChannel)
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOPosix/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ extension ServerBootstrap {
)
let asyncChannel = try NIOAsyncChannel<ChannelInitializerResult, Never>
._wrapAsyncChannelWithTransformations(
synchronouslyWrapping: serverChannel,
wrappingChannelSynchronously: serverChannel,
backPressureStrategy: serverBackPressureStrategy,
channelReadTransformation: { channel -> EventLoopFuture<ChannelInitializerResult> in
// The channelReadTransformation is run on the EL of the server channel
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOTCPEchoClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Client {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))

return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
inboundType: String.self,
outboundType: String.self
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOTCPEchoServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct Server {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))

return try NIOAsyncChannel(
synchronouslyWrapping: channel,
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
inboundType: String.self,
outboundType: String.self
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOWebSocketClient/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ struct Client {
// let upgrader = NIOTypedWebSocketClientUpgrader<UpgradeResult>(
// upgradePipelineHandler: { (channel, _) in
// channel.eventLoop.makeCompletedFuture {
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
// return UpgradeResult.websocket(asyncChannel)
// }
// }
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOWebSocketServer/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct Server {
// },
// upgradePipelineHandler: { (channel, _) in
// channel.eventLoop.makeCompletedFuture {
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<WebSocketFrame, WebSocketFrame>(wrappingChannelSynchronously: channel)
// return UpgradeResult.websocket(asyncChannel)
// }
// }
Expand All @@ -102,7 +102,7 @@ struct Server {
// notUpgradingCompletionHandler: { channel in
// channel.eventLoop.makeCompletedFuture {
// try channel.pipeline.syncOperations.addHandler(HTTPByteBufferResponsePartHandler())
// let asyncChannel = try NIOAsyncChannel<HTTPServerRequestPart, HTTPPart<HTTPResponseHead, ByteBuffer>>(synchronouslyWrapping: channel)
// let asyncChannel = try NIOAsyncChannel<HTTPServerRequestPart, HTTPPart<HTTPResponseHead, ByteBuffer>>(wrappingChannelSynchronously: channel)
// return UpgradeResult.notUpgraded(asyncChannel)
// }
// }
Expand Down
Loading

0 comments on commit 1040927

Please sign in to comment.