diff --git a/Sources/NIO/PendingWritesManager.swift b/Sources/NIO/PendingWritesManager.swift index e9742ab0e1..a093f61fd3 100644 --- a/Sources/NIO/PendingWritesManager.swift +++ b/Sources/NIO/PendingWritesManager.swift @@ -174,19 +174,19 @@ private struct PendingStreamWritesState { /// Indicate that a write has happened, this may be a write of multiple outstanding writes (using for example `writev`). /// - /// - warning: The closure will simply fulfill all the promises in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order. + /// - warning: The promises will be returned in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order. /// /// - parameters: /// - writeResult: The result of the write operation. - /// - returns: A closure that the caller _needs_ to run which will fulfill the promises of the writes and a `OneWriteOperationResult` which indicates if we could write everything or not. - public mutating func didWrite(itemCount: Int, result writeResult: IOResult) -> (() -> Void, OneWriteOperationResult) { - var promises: [EventLoopPromise] = [] - let fulfillPromises = { promises.forEach { $0.succeed(result: ()) } } - + /// - returns: A tuple of a promise and a `OneWriteResult`. The promise is the first promise that needs to be notified of the write result. + /// This promise will cascade the result to all other promises that need notifying. If no promises need to be notified, will be `nil`. + /// The write result will indicate whether we were able to write everything or not. + public mutating func didWrite(itemCount: Int, result writeResult: IOResult) -> (EventLoopPromise?, OneWriteOperationResult) { switch writeResult { case .wouldBlock(0): - return (fulfillPromises, .wouldBlock) + return (nil, .wouldBlock) case .processed(let written), .wouldBlock(let written): + var promise0: EventLoopPromise? assert(written >= 0, "allegedly written a negative amount of bytes: \(written)") var unaccountedWrites = written for _ in 0.. (() -> Void) { - var promises: [EventLoopPromise] = [] - promises.reserveCapacity(self.pendingWrites.count) + /// - returns: promise that needs to be failed, or `nil` if there were no pending writes. + public mutating func removeAll() -> EventLoopPromise? { + var promise0: EventLoopPromise? + while !self.pendingWrites.isEmpty { if let p = self.fullyWrittenFirst() { - promises.append(p) + if let promise = promise0 { + promise.futureResult.cascade(promise: p) + } else { + promise0 = p + } } } - return { promises.forEach { $0.fail(error: error) } } + return promise0 } /// Returns the best mechanism to write pending data at the current point in time. @@ -341,13 +349,13 @@ final class PendingStreamWritesManager: PendingWritesManager { /// - itemCount: The number of items we tried to write. /// - result: The result of the write operation. private func didWrite(itemCount: Int, result: IOResult) -> OneWriteOperationResult { - let (fulfillPromises, result) = self.state.didWrite(itemCount: itemCount, result: result) + let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result) if self.state.bytes < waterMark.low { channelWritabilityFlag.store(true) } - fulfillPromises() + promise?.succeed(result: ()) return result } @@ -408,7 +416,7 @@ final class PendingStreamWritesManager: PendingWritesManager { self.isOpen = false } - self.state.failAll(error: error)() + self.state.removeAll()?.fail(error: error) assert(self.state.isEmpty) }