Skip to content

Commit

Permalink
Add new ChannelOption to get the amount of buffered outbound data in …
Browse files Browse the repository at this point in the history
…the Channel (#2849)

Add ability to get the amount of buffered outbound data from `Channel`

### Motivation:
Right now, SwiftNIO does not have the API to answer the question "how
much data is buffered in the Channel". Applications focusing on
performance may need to fine-tune the amount of outbound data that will
be sent to optimize data throughput, adjust sending rate to avoid
overflow, and potentially reduce latency.

SwiftNIO currently provides some backpressure mechanism. This new API
will be a good addition. By knowing how much data is buffered directly,
applications can make informed decision to adjust for optimal buffer
sizes and send rates.

### Modifications:
- Expose current buffer size through ChannelOptions so that users can
read the value out. StreamSocketChannel, DatagramSocketChannel,
EmbeddedChannel, and AsyncTestingChannel have the same API interface.
- Various modifications to the existing tests to make sure the new API
is working correctly.
- Add a new `so_sndbuf` socket option so that users can easily adjust
the send buffer size.

### Result:

Users can get the amount of outbound bytes currently buffered in the
`Channel` through the new `BufferedWritableBytesOption` channel option.

---------

Co-authored-by: Cory Benfield <lukasa@apple.com>
  • Loading branch information
johnnzhou and Lukasa authored Sep 23, 2024
1 parent f666505 commit ac6d905
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 3 deletions.
3 changes: 3 additions & 0 deletions Sources/NIOCore/BSDSocketAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ extension NIOBSDSocket.Option {
/// Specifies the total per-socket buffer space reserved for receives.
public static let so_rcvbuf = Self(rawValue: SO_RCVBUF)

/// Specifies the total per-socket buffer space reserved for sends.
public static let so_sndbuf = Self(rawValue: SO_SNDBUF)

/// Specifies the receive timeout.
public static let so_rcvtimeo = Self(rawValue: SO_RCVTIMEO)

Expand Down
15 changes: 15 additions & 0 deletions Sources/NIOCore/ChannelOption.swift
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ extension ChannelOptions {
public typealias Value = Bool
public init() {}
}

/// `BufferedWritableBytesOption` allows users to know the number of writable bytes currently buffered in the `Channel`.
public struct BufferedWritableBytesOption: ChannelOption, Sendable {
public typealias Value = Int

public init() {}
}
}
}

Expand Down Expand Up @@ -358,6 +365,9 @@ public struct ChannelOptions: Sendable {

/// - seealso: `ReceivePacketInfo`
public static let receivePacketInfo = Types.ReceivePacketInfo()

/// - seealso: `BufferedWritableBytesOption`
public static let bufferedWritableBytes = Types.BufferedWritableBytesOption()
}

/// - seealso: `SocketOption`.
Expand Down Expand Up @@ -451,6 +461,11 @@ extension ChannelOption where Self == ChannelOptions.Types.ReceivePacketInfo {
public static var receivePacketInfo: Self { .init() }
}

/// - seealso: `BufferedWritableBytesOption`
extension ChannelOption where Self == ChannelOptions.Types.BufferedWritableBytesOption {
public static var bufferedWritableBytes: Self { .init() }
}

extension ChannelOptions {
/// A type-safe storage facility for `ChannelOption`s. You will only ever need this if you implement your own
/// `Channel` that needs to store `ChannelOption`s.
Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOEmbedded/AsyncTestingChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,14 @@ public final class NIOAsyncTestingChannel: Channel {
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
return self.allowRemoteHalfClosure as! Option.Value
}
if option is ChannelOptions.Types.BufferedWritableBytesOption {
let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
return partialResult + buffer.readableBytes
}

return result as! Option.Value
}
fatalError("option \(option) not supported")
}

Expand Down
8 changes: 8 additions & 0 deletions Sources/NIOEmbedded/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ public final class EmbeddedChannel: Channel {
if option is ChannelOptions.Types.AllowRemoteHalfClosureOption {
return self.allowRemoteHalfClosure as! Option.Value
}
if option is ChannelOptions.Types.BufferedWritableBytesOption {
let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in
let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self)
return partialResult + buffer.readableBytes
}

return result as! Option.Value
}
fatalError("option \(option) not supported")
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
return self.pendingWrites.writeSpinCount as! Option.Value
case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
return self.pendingWrites.waterMark as! Option.Value
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
default:
return try super.getOption0(option)
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ final class PendingDatagramWritesManager: PendingWritesManager {
self.state.isEmpty
}

var bufferedBytes: Int64 {
self.state.bytes
}

private func add(_ pendingWrite: PendingDatagramWrite) -> Bool {
assert(self.isOpen)
self.state.append(pendingWrite)
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ final class PendingStreamWritesManager: PendingWritesManager {
self.state.isEmpty
}

var bufferedBytes: Int64 {
self.state.bytes
}

/// Add a pending write alongside its promise.
///
/// - parameters:
Expand Down
2 changes: 2 additions & 0 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
throw ChannelError._operationUnsupported
}
return try self.socket.getUDPReceiveOffload() as! Option.Value
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
default:
return try super.getOption0(option)
}
Expand Down
110 changes: 110 additions & 0 deletions Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,116 @@ class AsyncTestingChannelTests: XCTestCase {
_ = try await channel.finish()
await XCTAsyncAssertThrowsError(try await channel.finish())
}

func testWriteOutboundEmptyBufferedByte() async throws {
let channel = NIOAsyncTestingChannel()
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(
.bufferedWritableBytes
)
XCTAssertEqual(0, buffered)

let buf = channel.allocator.buffer(capacity: 10)

channel.write(buf, promise: nil)
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOutboundBufferedBytesSingleWrite() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(
.bufferedWritableBytes
)
XCTAssertEqual(buf.readableBytes, buffered)
channel.flush()

buffered = try await channel.getOption(.bufferedWritableBytes).get()
XCTAssertEqual(0, buffered)
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOuboundBufferedBytesMultipleWrites() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")
let totalCount = 5
for _ in 0..<totalCount {
channel.write(buf, promise: nil)
}
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(
.bufferedWritableBytes
)
XCTAssertEqual(buf.readableBytes * totalCount, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

for _ in 0..<totalCount {
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
}

try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOuboundBufferedBytesWriteAndFlushInterleaved() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(
.bufferedWritableBytes
)
XCTAssertEqual(buf.readableBytes * 3, buffered)

channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(buf.readableBytes * 2, buffered)
channel.flush()
buffered = try await channel.getOption(.bufferedWritableBytes)
XCTAssertEqual(0, buffered)

for _ in 0..<5 {
try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
}

try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}

func testWriteOutboundBufferedBytesWriteAndFlush() async throws {
let channel = NIOAsyncTestingChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

try await XCTAsyncAssertTrue(await channel.writeOutbound(buf).isFull)
let buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption(
.bufferedWritableBytes
)
XCTAssertEqual(0, buffered)

try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self))
try await XCTAsyncAssertTrue(try await channel.finish().isClean)
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
Expand Down
99 changes: 99 additions & 0 deletions Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,103 @@ class EmbeddedChannelTest: XCTestCase {

XCTAssertEqual(try channel._channelCore.remoteAddress0(), remoteAddress)
}

func testWriteOutboundEmptyBufferedByte() throws {
let channel = EmbeddedChannel()
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

let buf = channel.allocator.buffer(capacity: 10)

channel.write(buf, promise: nil)
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOutboundBufferedByteSingleWrite() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes, buffered)
channel.flush()

buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOuboundBufferedBytesMultipleWrites() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")
let totalCount = 5
for _ in 0..<totalCount {
channel.write(buf, promise: nil)
}

var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * totalCount, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)
for _ in 0..<totalCount {
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
}

XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOuboundBufferedBytesWriteAndFlushInterleaved() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
var buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * 3, buffered)

channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

channel.write(buf, promise: nil)
channel.write(buf, promise: nil)
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(buf.readableBytes * 2, buffered)
channel.flush()
buffered = try channel.getOption(ChannelOptions.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

for _ in 0..<5 {
XCTAssertNoThrow(XCTAssertEqual(buf, try channel.readOutbound()))
}

XCTAssertNoThrow(XCTAssertTrue(try channel.finish().isClean))
}

func testWriteOutboundBufferedBytesWriteAndFlush() throws {
let channel = EmbeddedChannel()
var buf = channel.allocator.buffer(capacity: 10)
buf.writeString("hello")

try XCTAssertTrue(channel.writeOutbound(buf).isFull)
let buffered = try channel.getOption(.bufferedWritableBytes).wait()
XCTAssertEqual(0, buffered)

XCTAssertEqual(buf, try channel.readOutbound(as: ByteBuffer.self))
XCTAssertTrue(try channel.finish().isClean)
}
}
Loading

0 comments on commit ac6d905

Please sign in to comment.