Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new protocol for ChannelHandler to get buffered bytes in the channel handler #2918

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Sources/NIOCore/ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,19 @@ extension RemovableChannelHandler {
context.leavePipeline(removalToken: removalToken)
}
}

/// A `NIOOutboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for outbound direction.
public protocol NIOOutboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next outbound channel handler.
var outboundBufferedBytes: Int { get }
}

/// A `NIOInboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for inbound direction.
public protocol NIOInboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next inbound channel handler.
var inboundBufferedBytes: Int { get }
}
168 changes: 168 additions & 0 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2089,3 +2089,171 @@ extension ChannelPipeline: CustomDebugStringConvertible {
return handlers
}
}

extension ChannelPipeline {
FranzBusch marked this conversation as resolved.
Show resolved Hide resolved
private enum BufferingDirection: Equatable {
case inbound
case outbound
}

/// Retrieve the total number of bytes buffered for outbound.
public func outboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .outbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .outbound)
}
}

return future
}

/// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved.
///
/// - Returns: The `EventLoopFuture` which will be notified once the number of outbound bytes buffered in the `ChannelHandler`
/// referenced by the `ChannelHandlerContext` parameter `in` is collected.
/// If the `ChannelHandler` in the `ChannelHandlerContext` does not conform to
/// `NIOOutboundByteBufferingChannelHandler`, the future will contain`nil`.
public func outboundBufferedBytes(in context: ChannelHandlerContext) -> EventLoopFuture<Int?> {
let future: EventLoopFuture<Int?>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countBufferedBytes(context: context, direction: .outbound))
} else {
future = self.eventLoop.submit {
self.countBufferedBytes(context: context, direction: .outbound)
}
}

return future
}

/// Retrieve the total number of bytes buffered for inbound.
public func inboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .inbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .inbound)
}
}

return future
}

/// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the inbound buffered bytes of the `ChannelHandler` will be retrieved.
///
/// - Returns: The `EventLoopFuture` which will be notified once the number of inbound bytes buffered in the `ChannelHandler`
/// referenced by the `ChannelHandlerContext` parameter `in` is collected.
/// If the `ChannelHandler` in the `ChannelHandlerContext` does not conform to
/// `NIOInboundByteBufferingChannelHandler`, the future will contain `nil`.
public func inboundBufferedBytes(in context: ChannelHandlerContext) -> EventLoopFuture<Int?> {
let future: EventLoopFuture<Int?>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countBufferedBytes(context: context, direction: .inbound))
} else {
future = self.eventLoop.submit {
self.countBufferedBytes(context: context, direction: .inbound)
}
}

return future
}

private func countBufferedBytes(context: ChannelHandlerContext, direction: BufferingDirection) -> Int? {
switch direction {
case .inbound:
guard let handler = context.handler as? NIOInboundByteBufferingChannelHandler else {
return nil
}
return handler.inboundBufferedBytes
case .outbound:
guard let handler = context.handler as? NIOOutboundByteBufferingChannelHandler else {
return nil
}
return handler.outboundBufferedBytes
}

}

private func countAllBufferedBytes(direction: BufferingDirection) -> Int {
var total = 0
var current = self.head?.next
switch direction {
case .inbound:
while let c = current, c !== self.tail {
if let inboundHandler = c.handler as? NIOInboundByteBufferingChannelHandler {
total += inboundHandler.inboundBufferedBytes
}
current = current?.next
}
case .outbound:
while let c = current, c !== self.tail {
if let outboundHandler = c.handler as? NIOOutboundByteBufferingChannelHandler {
total += outboundHandler.outboundBufferedBytes
}
current = current?.next
}
}

return total
}
}

extension ChannelPipeline.SynchronousOperations {
/// Retrieve the total number of bytes buffered for outbound.
///
/// - Important: This *must* be called on the event loop.
public func outboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .outbound)
}

/// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOOutboundByteBufferingChannelHandler`, this method will return `nil`.
public func outboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return self._pipeline.countBufferedBytes(context: context, direction: .outbound)
}

/// Retrieve total number of bytes buffered for inbound.
///
/// - Important: This *must* be called on the event loop.
public func inboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .inbound)
}

/// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the inbound buffered bytes of the `handler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOInboundByteBufferingChannelHandler`, this method will return `nil`.
public func inboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return self._pipeline.countBufferedBytes(context: context, direction: .inbound)
}
}
Loading