Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new ChannelOption to get the amount of buffered outbound data in the Channel #2849

Merged
merged 27 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
69d8fcb
add buffered writable writes option to channel
johnnzhou Aug 10, 2024
5a06f57
add tests for pending datagram writes manager
johnnzhou Aug 15, 2024
576e04b
add tests for pending stream writes manager
johnnzhou Aug 16, 2024
5598706
add channel tests and datagram channel tests for bufferedBytes option
johnnzhou Aug 19, 2024
18b9562
Merge remote-tracking branch 'upstream/main'
johnnzhou Aug 19, 2024
f025e05
align with upstream
johnnzhou Aug 19, 2024
8e3935a
fix type inconsistency issue
johnnzhou Aug 19, 2024
5f0029d
Merge branch 'apple:main' into main
johnnzhou Aug 19, 2024
7ad22c7
skip checking snd_buf size as different platforms have different beha…
johnnzhou Aug 20, 2024
69ab6d0
rename test and add back missing asserts
johnnzhou Aug 20, 2024
756f11d
Merge branch 'main' into main
johnnzhou Aug 20, 2024
b9a3442
[WIP] align interface type to Int; support new channel option for emb…
johnnzhou Aug 21, 2024
7d8dcdc
add unit tests for embedded and async channels
johnnzhou Aug 24, 2024
2023396
Merge branch 'main' into main
johnnzhou Sep 3, 2024
f6337d9
Merge branch 'main' into main
johnnzhou Sep 3, 2024
2d7428a
Merge branch 'main' into main
johnnzhou Sep 4, 2024
10c004c
Merge branch 'main' into main
johnnzhou Sep 8, 2024
760b4e7
Merge branch 'main' into main
johnnzhou Sep 11, 2024
e963207
Merge branch 'main' into main
johnnzhou Sep 13, 2024
1460bfe
Merge branch 'main' into main
johnnzhou Sep 14, 2024
686afb8
Merge branch 'main' into main
Lukasa Sep 16, 2024
331e0ed
Merge branch 'main' into main
johnnzhou Sep 16, 2024
a248e2c
change foreach loop back to for-in loop
johnnzhou Sep 17, 2024
225e02e
Merge branch 'main' into main
johnnzhou Sep 18, 2024
eeda805
Merge branch 'main' into main
johnnzhou Sep 20, 2024
ac06d72
fix lint problem
johnnzhou Sep 23, 2024
f3ee542
Merge branch 'main' of github.com:johnnzhou/swift-nio
johnnzhou Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Sources/NIOCore/BSDSocketAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ extension NIOBSDSocket.Option {

/// Specifies the total per-socket buffer space reserved for receives.
public static let so_rcvbuf = Self(rawValue: SO_RCVBUF)

/// Specifies the total per-socket buffer space reserved for sends.
public static let so_sndbuf = Self(rawValue: SO_SNDBUF)

/// Specifies the receive timeout.
public static let so_rcvtimeo = Self(rawValue: SO_RCVTIMEO)
Expand Down
15 changes: 15 additions & 0 deletions Sources/NIOCore/ChannelOption.swift
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ extension ChannelOptions {
public typealias Value = Bool
public init() {}
}

/// `BufferedWritableBytesOption` allows users to know the number of writable bytes currently buffered in the `Channel`.
public struct BufferedWritableBytesOption: ChannelOption, Sendable {
public typealias Value = Int

public init() {}
}
}
}

Expand Down Expand Up @@ -358,6 +365,9 @@ public struct ChannelOptions: Sendable {

/// - seealso: `ReceivePacketInfo`
public static let receivePacketInfo = Types.ReceivePacketInfo()

/// - seealso: `BufferedWritableBytesOption`
public static let bufferedWritableBytes = Types.BufferedWritableBytesOption()
}

/// - seealso: `SocketOption`.
Expand Down Expand Up @@ -451,6 +461,11 @@ extension ChannelOption where Self == ChannelOptions.Types.ReceivePacketInfo {
public static var receivePacketInfo: Self { .init() }
}

/// - seealso: `BufferedWritableBytesOption`
extension ChannelOption where Self == ChannelOptions.Types.BufferedWritableBytesOption {
public static var bufferedWritableBytes: Self { .init() }
}

extension ChannelOptions {
/// A type-safe storage facility for `ChannelOption`s. You will only ever need this if you implement your own
/// `Channel` that needs to store `ChannelOption`s.
Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOEmbedded/AsyncTestingChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,14 @@ public final class NIOAsyncTestingChannel: Channel {
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
return self.allowRemoteHalfClosure as! Option.Value
}
if option is ChannelOptions.Types.BufferedWritableBytesOption {
let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
return partialResult + buffer.readableBytes
}

return result as! Option.Value
}
fatalError("option \(option) not supported")
}

Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOEmbedded/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,14 @@ public final class EmbeddedChannel: Channel {
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
return self.allowRemoteHalfClosure as! Option.Value
}
if option is ChannelOptions.Types.BufferedWritableBytesOption {
let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
return partialResult + buffer.readableBytes
}

return result as! Option.Value
}
fatalError("option \(option) not supported")
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
return self.pendingWrites.writeSpinCount as! Option.Value
case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
return self.pendingWrites.waterMark as! Option.Value
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
default:
return try super.getOption0(option)
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ final class PendingDatagramWritesManager: PendingWritesManager {
var isEmpty: Bool {
self.state.isEmpty
}

var bufferedBytes: Int64 {
return self.state.bytes
}

private func add(_ pendingWrite: PendingDatagramWrite) -> Bool {
assert(self.isOpen)
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ final class PendingStreamWritesManager: PendingWritesManager {
var isEmpty: Bool {
self.state.isEmpty
}

var bufferedBytes: Int64 {
return self.state.bytes
}

/// Add a pending write alongside its promise.
///
Expand Down
2 changes: 2 additions & 0 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
throw ChannelError._operationUnsupported
}
return try self.socket.getUDPReceiveOffload() as! Option.Value
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
default:
return try super.getOption0(option)
}
Expand Down
101 changes: 101 additions & 0 deletions Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,107 @@ class AsyncTestingChannelTests: XCTestCase {
_ = try await channel.finish()
await XCTAsyncAssertThrowsError(try await channel.finish())
}

func testWriteOutboundEmptyBufferedByte() async throws {
let channel = NIOAsyncTestingChannel()
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

let buf = channel.allocator.buffer(capacity: 10)

channel.write(buf, promise: nil)
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOutboundBufferedBytesSingleWrite() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(buf.readableBytes, buffered)
channel.flush()

buffered = try await channel.getOption(.bufferedWritableBytes).get()
XCTAssertEqual(0, buffered)
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOuboundBufferedBytesMultipleWrites() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")
let totalCount = 5
for _ in 0..<totalCount {
channel.write(buf, promise: nil)
}
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(buf.readableBytes * totalCount, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

for _ in 0..<totalCount {
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
}

try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOuboundBufferedBytesWriteAndFlushInterleaved() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(buf.readableBytes * 3, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(buf.readableBytes * 2, buffered)
channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)


for _ in 0..<5 {
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
}

try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOutboundBufferedBytesWriteAndFlush() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

try await XCTAsyncAssertTrue(await channel.writeOutbound(buf).isFull)
let buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
Expand Down
100 changes: 100 additions & 0 deletions Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,104 @@ class EmbeddedChannelTest: XCTestCase {

XCTAssertEqual(try channel._channelCore.remoteAddress0(), remoteAddress)
}

func testWriteOutboundEmptyBufferedByte() throws {
let channel = EmbeddedChannel()
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

let buf = channel.allocator.buffer(capacity: 10)

channel.write(buf, promise: nil)
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOutboundBufferedByteSingleWrite() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes, buffered)
channel.flush()

buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOuboundBufferedBytesMultipleWrites() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")
let totalCount = 5
for _ in 0..<totalCount {
channel.write(buf, promise: nil)
}

var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * totalCount, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
for _ in 0..<totalCount {
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
}

XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOuboundBufferedBytesWriteAndFlushInterleaved() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * 3, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * 2, buffered)
channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)


try (0..<5).forEach { _ in
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
}

XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOutboundBufferedBytesWriteAndFlush() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

try XCTAssertTrue(channel.writeOutbound(buf).isFull)
let buffered = try channel.getOption(.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

XCTAssertEqual(buf, try channel.readOutbound(as: ByteBuffer.self))
XCTAssertTrue(try channel.finish().isClean)
}
}
Loading
Loading