Skip to content

Commit

Permalink
Reduce allocations when notify promises related to pending writes. (#308
Browse files Browse the repository at this point in the history
)

Motivation:

We returned an callback which needed to be executed by the caller. This will cause extra allocations and is not needed. We can just return a EventLoopPromise which will cascade the notification.

Modification:

Not return a closure but promise.

Result:

Less allocations.
  • Loading branch information
normanmaurer authored Apr 13, 2018
1 parent c9c43b5 commit 7758140
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions Sources/NIO/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,19 @@ private struct PendingStreamWritesState {

/// Indicate that a write has happened, this may be a write of multiple outstanding writes (using for example `writev`).
///
/// - warning: The closure will simply fulfill all the promises in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order.
/// - warning: The promises will be returned in order. If one of those promises does for example close the `Channel` we might see subsequent writes fail out of order. Example: Imagine the user issues three writes: `A`, `B` and `C`. Imagine that `A` and `B` both get successfully written in one write operation but the user closes the `Channel` in `A`'s callback. Then overall the promises will be fulfilled in this order: 1) `A`: success 2) `C`: error 3) `B`: success. Note how `B` and `C` get fulfilled out of order.
///
/// - parameters:
/// - writeResult: The result of the write operation.
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises of the writes and a `OneWriteOperationResult` which indicates if we could write everything or not.
public mutating func didWrite(itemCount: Int, result writeResult: IOResult<Int>) -> (() -> Void, OneWriteOperationResult) {
var promises: [EventLoopPromise<Void>] = []
let fulfillPromises = { promises.forEach { $0.succeed(result: ()) } }

/// - returns: A tuple of a promise and a `OneWriteResult`. The promise is the first promise that needs to be notified of the write result.
/// This promise will cascade the result to all other promises that need notifying. If no promises need to be notified, will be `nil`.
/// The write result will indicate whether we were able to write everything or not.
public mutating func didWrite(itemCount: Int, result writeResult: IOResult<Int>) -> (EventLoopPromise<Void>?, OneWriteOperationResult) {
switch writeResult {
case .wouldBlock(0):
return (fulfillPromises, .wouldBlock)
return (nil, .wouldBlock)
case .processed(let written), .wouldBlock(let written):
var promise0: EventLoopPromise<Void>?
assert(written >= 0, "allegedly written a negative amount of bytes: \(written)")
var unaccountedWrites = written
for _ in 0..<itemCount {
Expand All @@ -195,18 +195,22 @@ private struct PendingStreamWritesState {
unaccountedWrites -= headItemReadableBytes
/* we wrote at least the whole head item, so drop it and succeed the promise */
if let promise = self.fullyWrittenFirst() {
promises.append(promise)
if let p = promise0 {
p.futureResult.cascade(promise: promise)
} else {
promise0 = promise
}
}
} else {
/* we could only write a part of the head item, so don't drop it but remember what we wrote */
self.partiallyWrittenFirst(bytes: unaccountedWrites)

// may try again depending on the writeSpinCount
return (fulfillPromises, .writtenPartially)
return (promise0, .writtenPartially)
}
}
assert(unaccountedWrites == 0, "after doing all the accounting for the byte written, \(unaccountedWrites) bytes of unaccounted writes remain.")
return (fulfillPromises, .writtenCompletely)
return (promise0, .writtenCompletely)
}
}

Expand All @@ -215,20 +219,24 @@ private struct PendingStreamWritesState {
return self.pendingWrites.hasMark()
}

/// Fail all the outstanding writes.
/// Remove all pending writes and return a `EventLoopPromise` which will cascade notifications to all.
///
/// - warning: See the warning for `didWrite`.
///
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises.
public mutating func failAll(error: Error) -> (() -> Void) {
var promises: [EventLoopPromise<Void>] = []
promises.reserveCapacity(self.pendingWrites.count)
/// - returns: promise that needs to be failed, or `nil` if there were no pending writes.
public mutating func removeAll() -> EventLoopPromise<Void>? {
var promise0: EventLoopPromise<Void>?

while !self.pendingWrites.isEmpty {
if let p = self.fullyWrittenFirst() {
promises.append(p)
if let promise = promise0 {
promise.futureResult.cascade(promise: p)
} else {
promise0 = p
}
}
}
return { promises.forEach { $0.fail(error: error) } }
return promise0
}

/// Returns the best mechanism to write pending data at the current point in time.
Expand Down Expand Up @@ -341,13 +349,13 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - itemCount: The number of items we tried to write.
/// - result: The result of the write operation.
private func didWrite(itemCount: Int, result: IOResult<Int>) -> OneWriteOperationResult {
let (fulfillPromises, result) = self.state.didWrite(itemCount: itemCount, result: result)
let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
}

fulfillPromises()
promise?.succeed(result: ())
return result
}

Expand Down Expand Up @@ -408,7 +416,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
self.isOpen = false
}

self.state.failAll(error: error)()
self.state.removeAll()?.fail(error: error)

assert(self.state.isEmpty)
}
Expand Down

0 comments on commit 7758140

Please sign in to comment.