Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call NIOAsyncWriterSinkDelegate outside of the lock #2547

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 1317015
}
"mallocCountTotal" : 164419
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 1317022
"mallocCountTotal" : 164426
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"mallocCountTotal" : 1317015
"mallocCountTotal" : 164419
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,34 @@ extension NIOAsyncChannelOutboundWriterHandler {

@inlinable
func didYield(contentsOf sequence: Deque<OutboundOut>) {
// This always called from an async context, so we must loop-hop.
self.eventLoop.execute {
if self.eventLoop.inEventLoop {
self.handler._didYield(sequence: sequence)
} else {
self.eventLoop.execute {
self.handler._didYield(sequence: sequence)
}
}
}

@inlinable
func didYield(_ element: OutboundOut) {
// This always called from an async context, so we must loop-hop.
self.eventLoop.execute {
if self.eventLoop.inEventLoop {
self.handler._didYield(element: element)
} else {
self.eventLoop.execute {
self.handler._didYield(element: element)
}
}
}

@inlinable
func didTerminate(error: Error?) {
// This always called from an async context, so we must loop-hop.
self.eventLoop.execute {
if self.eventLoop.inEventLoop {
self.handler._didTerminate(error: error)
} else {
self.eventLoop.execute {
self.handler._didTerminate(error: error)
}
}
}
}
Expand Down
478 changes: 329 additions & 149 deletions Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ final class AsyncChannelTests: XCTestCase {
inboundReader = wrapped.inboundStream

try await channel.testingEventLoop.executeInContext {
XCTAssertEqual(0, closeRecorder.outboundCloses)
XCTAssertEqual(1, closeRecorder.outboundCloses)
}
}

Expand Down Expand Up @@ -159,7 +159,7 @@ final class AsyncChannelTests: XCTestCase {
inboundReader = wrapped.inboundStream

try await channel.testingEventLoop.executeInContext {
XCTAssertEqual(0, closeRecorder.allCloses)
XCTAssertEqual(1, closeRecorder.allCloses)
}
}

Expand Down
45 changes: 40 additions & 5 deletions Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,32 @@
import DequeModule
import NIOCore
import XCTest
import NIOConcurrencyHelpers

private struct SomeError: Error, Hashable {}

private final class MockAsyncWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable {
typealias Element = String

var didYieldCallCount = 0
var _didYieldCallCount = NIOLockedValueBox(0)
var didYieldCallCount: Int {
self._didYieldCallCount.withLockedValue { $0 }
}
var didYieldHandler: ((Deque<String>) -> Void)?
func didYield(contentsOf sequence: Deque<String>) {
self.didYieldCallCount += 1
self._didYieldCallCount.withLockedValue { $0 += 1 }
if let didYieldHandler = self.didYieldHandler {
didYieldHandler(sequence)
}
}

var didTerminateCallCount = 0
var _didTerminateCallCount = NIOLockedValueBox(0)
var didTerminateCallCount: Int {
self._didTerminateCallCount.withLockedValue { $0 }
}
var didTerminateHandler: ((Error?) -> Void)?
func didTerminate(error: Error?) {
self.didTerminateCallCount += 1
self._didTerminateCallCount.withLockedValue { $0 += 1 }
if let didTerminateHandler = self.didTerminateHandler {
didTerminateHandler(error)
}
Expand Down Expand Up @@ -68,6 +75,8 @@ final class NIOAsyncWriterTests: XCTestCase {
}

func testMultipleConcurrentWrites() async throws {
var elements = 0
self.delegate.didYieldHandler = { elements += $0.count }
let task1 = Task { [writer] in
for i in 0...9 {
try await writer!.yield("message\(i)")
Expand All @@ -88,7 +97,33 @@ final class NIOAsyncWriterTests: XCTestCase {
try await task2.value
try await task3.value

XCTAssertEqual(self.delegate.didYieldCallCount, 30)
XCTAssertEqual(elements, 30)
}

func testMultipleConcurrentBatchWrites() async throws {
var elements = 0
self.delegate.didYieldHandler = { elements += $0.count }
let task1 = Task { [writer] in
for i in 0...9 {
try await writer!.yield(contentsOf: ["message\(i).1", "message\(i).2"])
}
}
let task2 = Task { [writer] in
for i in 10...19 {
try await writer!.yield(contentsOf: ["message\(i).1", "message\(i).2"])
}
}
let task3 = Task { [writer] in
for i in 20...29 {
try await writer!.yield(contentsOf: ["message\(i).1", "message\(i).2"])
}
}

try await task1.value
try await task2.value
try await task3.value

XCTAssertEqual(elements, 60)
}

func testWriterCoalescesWrites() async throws {
Expand Down