Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce allocations when notify promises related to pending writes. #308

Merged
merged 2 commits into from
Apr 13, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions Sources/NIO/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,19 @@ private struct PendingStreamWritesState {

/// Indicate that a write has happened, this may be a write of multiple outstanding writes (using for example `writev`).
///
/// - warning: The closure will simply fulfill all the promises in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order.
/// - warning: The promises will be returned in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order.
///
/// - parameters:
/// - writeResult: The result of the write operation.
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises of the writes and a `OneWriteOperationResult` which indicates if we could write everything or not.
public mutating func didWrite(itemCount: Int, result writeResult: IOResult<Int>) -> (() -> Void, OneWriteOperationResult) {
var promises: [EventLoopPromise<Void>] = []
let fulfillPromises = { promises.forEach { $0.succeed(result: ()) } }

/// - returns: A tuple of a promise and a `OneWriteResult`. The promise is the first promise that needs to be notified of the write result.
/// This promise will cascade the result to all other promises that need notifying. If no promises need to be notified, will be `nil`.
/// The write result will indicate whether we were able to write everything or not.
public mutating func didWrite(itemCount: Int, result writeResult: IOResult<Int>) -> (EventLoopPromise<Void>?, OneWriteOperationResult) {
switch writeResult {
case .wouldBlock(0):
return (fulfillPromises, .wouldBlock)
return (nil, .wouldBlock)
case .processed(let written), .wouldBlock(let written):
var promise0: EventLoopPromise<Void>?
assert(written >= 0, "allegedly written a negative amount of bytes: \(written)")
var unaccountedWrites = written
for _ in 0..<itemCount {
Expand All @@ -195,18 +195,22 @@ private struct PendingStreamWritesState {
unaccountedWrites -= headItemReadableBytes
/* we wrote at least the whole head item, so drop it and succeed the promise */
if let promise = self.fullyWrittenFirst() {
promises.append(promise)
if let p = promise0 {
p.futureResult.cascade(promise: promise)
} else {
promise0 = promise
}
}
} else {
/* we could only write a part of the head item, so don't drop it but remember what we wrote */
self.partiallyWrittenFirst(bytes: unaccountedWrites)

// may try again depending on the writeSpinCount
return (fulfillPromises, .writtenPartially)
return (promise0, .writtenPartially)
}
}
assert(unaccountedWrites == 0, "after doing all the accounting for the byte written, \(unaccountedWrites) bytes of unaccounted writes remain.")
return (fulfillPromises, .writtenCompletely)
return (promise0, .writtenCompletely)
}
}

Expand All @@ -215,20 +219,24 @@ private struct PendingStreamWritesState {
return self.pendingWrites.hasMark()
}

/// Fail all the outstanding writes.
/// Remove all pending writes and return a `EventLoopPromise` which will cascade notifications to all.
///
/// - warning: See the warning for `didWrite`.
///
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises.
public mutating func failAll(error: Error) -> (() -> Void) {
var promises: [EventLoopPromise<Void>] = []
promises.reserveCapacity(self.pendingWrites.count)
/// - returns: promise that needs to be failed, or `nil` if there were no pending writes.
public mutating func removeAll() -> EventLoopPromise<Void>? {
var promise0: EventLoopPromise<Void>?

while !self.pendingWrites.isEmpty {
if let p = self.fullyWrittenFirst() {
promises.append(p)
if let promise = promise0 {
promise.futureResult.cascade(promise: p)
} else {
promise0 = p
}
}
}
return { promises.forEach { $0.fail(error: error) } }
return promise0
}

/// Returns the best mechanism to write pending data at the current point in time.
Expand Down Expand Up @@ -341,13 +349,13 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - itemCount: The number of items we tried to write.
/// - result: The result of the write operation.
private func didWrite(itemCount: Int, result: IOResult<Int>) -> OneWriteOperationResult {
let (fulfillPromises, result) = self.state.didWrite(itemCount: itemCount, result: result)
let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
}

fulfillPromises()
promise?.succeed(result: ())
return result
}

Expand Down Expand Up @@ -408,7 +416,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
self.isOpen = false
}

self.state.failAll(error: error)()
self.state.removeAll()?.fail(error: error)

assert(self.state.isEmpty)
}
Expand Down