Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pooled control message storage. #2422

Merged
merged 6 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions Sources/NIOPosix/ControlMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,45 @@ import CNIOWindows
struct UnsafeControlMessageStorage: Collection {
let bytesPerMessage: Int
var buffer: UnsafeMutableRawBufferPointer
private let deallocateBuffer: Bool

/// Initialise which includes allocating memory
/// parameter:
/// - bytesPerMessage: How many bytes have been allocated for each supported message.
/// - buffer: The memory allocated to use for control messages.
private init(bytesPerMessage: Int, buffer: UnsafeMutableRawBufferPointer) {
/// - deallocateBuffer: buffer owning indicator
private init(bytesPerMessage: Int, buffer: UnsafeMutableRawBufferPointer, deallocateBuffer: Bool) {
self.bytesPerMessage = bytesPerMessage
self.buffer = buffer
self.deallocateBuffer = deallocateBuffer
}

// Guess that 4 Int32 payload messages is enough for anyone.
static var bytesPerMessage: Int { NIOBSDSocketControlMessage.space(payloadSize: MemoryLayout<Int32>.stride) * 4 }

/// Allocate new memory - Caller must call `deallocate` when no longer required.
/// parameter:
/// - msghdrCount: How many `msghdr` structures will be fed from this buffer - we assume 4 Int32 cmsgs for each.
static func allocate(msghdrCount: Int) -> UnsafeControlMessageStorage {
// Guess that 4 Int32 payload messages is enough for anyone.
let bytesPerMessage = NIOBSDSocketControlMessage.space(payloadSize: MemoryLayout<Int32>.stride) * 4
let bytesPerMessage = Self.bytesPerMessage
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bytesPerMessage * msghdrCount,
alignment: MemoryLayout<cmsghdr>.alignment)
return UnsafeControlMessageStorage(bytesPerMessage: bytesPerMessage, buffer: buffer)
alignment: MemoryLayout<cmsghdr>.alignment)
return UnsafeControlMessageStorage(bytesPerMessage: bytesPerMessage, buffer: buffer, deallocateBuffer: true)
}

/// Create an instance not owning the buffer
/// parameter:
/// - bytesPerMessage: How many bytes have been allocated for each supported message.
/// - buffer: The memory allocated to use for control messages.
static func makeNotOwning(bytesPerMessage: Int, buffer: UnsafeMutableRawBufferPointer) -> UnsafeControlMessageStorage {
return UnsafeControlMessageStorage(bytesPerMessage: bytesPerMessage, buffer: buffer, deallocateBuffer: false)
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
}

mutating func deallocate() {
self.buffer.deallocate()
self.buffer = UnsafeMutableRawBufferPointer(start: UnsafeMutableRawPointer(bitPattern: 0x7eadbeef), count: 0)
if self.deallocateBuffer {
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
self.buffer.deallocate()
self.buffer = UnsafeMutableRawBufferPointer(start: UnsafeMutableRawPointer(bitPattern: 0x7eadbeef), count: 0)
}
}

/// Get the part of the buffer for use with a message.
Expand All @@ -65,7 +80,6 @@ struct UnsafeControlMessageStorage: Collection {
func index(after: Int) -> Int {
return after + 1
}

}

/// Representation of a `cmsghdr` and associated data.
Expand Down
12 changes: 4 additions & 8 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ final class PendingDatagramWritesManager: PendingWritesManager {

private let bufferPool: Pool<PooledBuffer>
private let msgBufferPool: Pool<PooledMsgBuffer>
private let controlMessageStorage: UnsafeControlMessageStorage

private var state = PendingDatagramWritesState()

Expand All @@ -400,13 +399,10 @@ final class PendingDatagramWritesManager: PendingWritesManager {
///
/// - parameters:
/// - bufferPool: a pool of buffers to be used for IOVector and storage references
/// - msgs: A pre-allocated array of `MMsgHdr` elements
/// - addresses: A pre-allocated array of `sockaddr_storage` elements
/// - controlMessageStorage: Pre-allocated memory for storing cmsghdr data during a vector write operation.
init(bufferPool: Pool<PooledBuffer>, msgBufferPool: Pool<PooledMsgBuffer>, controlMessageStorage: UnsafeControlMessageStorage) {
/// - msgBufferPool: a pool of buffers to be usded for `MMsgHdr`, `sockaddr_storage` and cmsghdr elements
init(bufferPool: Pool<PooledBuffer>, msgBufferPool: Pool<PooledMsgBuffer>) {
self.bufferPool = bufferPool
self.msgBufferPool = msgBufferPool
self.controlMessageStorage = controlMessageStorage
}

/// Mark the flush checkpoint.
Expand Down Expand Up @@ -610,12 +606,12 @@ final class PendingDatagramWritesManager: PendingWritesManager {
let msgBuffer = self.msgBufferPool.get()
defer { self.msgBufferPool.put(msgBuffer) }

return try msgBuffer.withUnsafePointers { msgs, addresses in
return try msgBuffer.withUnsafePointers { msgs, addresses, controlMessageStorage in
return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state,
bufferPool: self.bufferPool,
msgs: msgs,
addresses: addresses,
controlMessageStorage: self.controlMessageStorage,
controlMessageStorage: controlMessageStorage,
{ try vectorWriteOperation($0) }),
messages: msgs)
}
Expand Down
38 changes: 26 additions & 12 deletions Sources/NIOPosix/Pool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ struct PooledMsgBuffer: PoolElement {
let count: Int
let spaceForMsgHdrs: Int
let spaceForAddresses: Int
let spaceForControlData: Int

init(count: Int) {
var spaceForMsgHdrs = MemoryLayout<MMsgHdr>.stride * count
Expand All @@ -220,13 +221,17 @@ struct PooledMsgBuffer: PoolElement {
var spaceForAddress = MemoryLayout<sockaddr_storage>.stride * count
spaceForAddress.roundUpToAlignment(for: MemorySentinel.self)

var spaceForControlData = (UnsafeControlMessageStorage.bytesPerMessage * count)
spaceForControlData.roundUpToAlignment(for: cmsghdr.self)

self.count = count
self.spaceForMsgHdrs = spaceForMsgHdrs
self.spaceForAddresses = spaceForAddress
self.spaceForControlData = spaceForControlData
}

var totalByteCount: Int {
self.spaceForMsgHdrs + self.spaceForAddresses + MemoryLayout<MemorySentinel>.size
self.spaceForMsgHdrs + self.spaceForAddresses + self.spaceForControlData + MemoryLayout<MemorySentinel>.size
}

var msgHdrsOffset: Int {
Expand All @@ -237,8 +242,12 @@ struct PooledMsgBuffer: PoolElement {
self.spaceForMsgHdrs
}

var controlDataOffset: Int {
self.spaceForMsgHdrs + self.spaceForAddresses
}

var memorySentinelOffset: Int {
return self.spaceForMsgHdrs + self.spaceForAddresses
return self.spaceForMsgHdrs + self.spaceForAddresses + self.spaceForControlData
}
}

Expand All @@ -254,18 +263,20 @@ struct PooledMsgBuffer: PoolElement {
storage.withUnsafeMutablePointers { headPointer, tailPointer in
UnsafeRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).bindMemory(to: MMsgHdr.self, capacity: count)
UnsafeRawPointer(tailPointer + headPointer.pointee.addressesOffset).bindMemory(to: sockaddr_storage.self, capacity: count)
// space for control message data not needed to be bound
UnsafeRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).bindMemory(to: MemorySentinel.self, capacity: 1)
}

return storage
}

func withUnsafeMutableTypedPointers<ReturnType>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeMutablePointer<MemorySentinel>) throws -> ReturnType
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage, UnsafeMutablePointer<MemorySentinel>) throws -> ReturnType
) rethrows -> ReturnType {
return try self.withUnsafeMutablePointers { headPointer, tailPointer in
let msgHdrsPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).assumingMemoryBound(to: MMsgHdr.self)
let addressesPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.addressesOffset).assumingMemoryBound(to: sockaddr_storage.self)
let controlDataPointer = UnsafeMutableRawBufferPointer(start: tailPointer + headPointer.pointee.controlDataOffset, count: headPointer.pointee.spaceForControlData)
let sentinelPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).assumingMemoryBound(to: MemorySentinel.self)

let msgHdrsBufferPointer = UnsafeMutableBufferPointer(
Expand All @@ -274,13 +285,16 @@ struct PooledMsgBuffer: PoolElement {
let addressesBufferPointer = UnsafeMutableBufferPointer(
start: addressesPointer, count: headPointer.pointee.count
)
return try body(msgHdrsBufferPointer, addressesBufferPointer, sentinelPointer)
let controlMessageStorage = UnsafeControlMessageStorage.makeNotOwning(
bytesPerMessage: UnsafeControlMessageStorage.bytesPerMessage,
buffer: controlDataPointer)
return try body(msgHdrsBufferPointer, addressesBufferPointer, controlMessageStorage, sentinelPointer)
}
}
}

private func validateSentinel() {
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
self.storage.withUnsafeMutableTypedPointers { _, _, _, sentinelPointer in
precondition(sentinelPointer.pointee == Self.sentinelValue, "Detected memory handling error!")
}
}
Expand All @@ -289,7 +303,7 @@ struct PooledMsgBuffer: PoolElement {

init() {
self.storage = .create(count: Socket.writevLimitIOVectors)
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
self.storage.withUnsafeMutableTypedPointers { _, _, _, sentinelPointer in
sentinelPointer.pointee = Self.sentinelValue
}
}
Expand All @@ -299,22 +313,22 @@ struct PooledMsgBuffer: PoolElement {
}

func withUnsafePointers<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>) throws -> ReturnValue
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage) throws -> ReturnValue
) rethrows -> ReturnValue {
defer {
self.validateSentinel()
}
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
return try body(msgs, addresses)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, controlMessageStorage, _ in
return try body(msgs, addresses, controlMessageStorage)
}
}

func withUnsafePointersWithStorageManagement<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, Unmanaged<AnyObject>) throws -> ReturnValue
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage, Unmanaged<AnyObject>) throws -> ReturnValue
) rethrows -> ReturnValue {
let storageRef: Unmanaged<AnyObject> = Unmanaged.passUnretained(self.storage)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
try body(msgs, addresses, storageRef)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, controlMessageStorage, _ in
try body(msgs, addresses, controlMessageStorage, storageRef)
}
}
}
5 changes: 0 additions & 5 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ internal final class SelectableEventLoop: EventLoop {
let bufferPool: Pool<PooledBuffer>
let msgBufferPool: Pool<PooledMsgBuffer>

// Used for UDP control messages.
private(set) var controlMessageStorage: UnsafeControlMessageStorage

// The `_parentGroup` will always be set unless this is a thread takeover or we shut down.
@usableFromInline
internal var _parentGroup: Optional<MultiThreadedEventLoopGroup>
Expand Down Expand Up @@ -185,7 +182,6 @@ Further information:
self.thread = thread
self.bufferPool = Pool<PooledBuffer>(maxSize: 16)
self.msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
// We will process 4096 tasks per while loop.
self.tasksCopy.reserveCapacity(4096)
self.canBeShutdownIndividually = canBeShutdownIndividually
Expand All @@ -202,7 +198,6 @@ Further information:
"illegal internal state on deinit: \(self.internalState)")
assert(self.externalState == .resourcesReclaimed,
"illegal external state on shutdown: \(self.externalState)")
self.controlMessageStorage.deallocate()
}

/// Is this `SelectableEventLoop` still open (ie. not shutting down or shut down)
Expand Down
46 changes: 21 additions & 25 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}

self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
msgBufferPool: eventLoop.msgBufferPool)

try super.init(
socket: socket,
Expand All @@ -440,8 +439,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
self.vectorReadManager = nil
try socket.setNonBlocking()
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
msgBufferPool: eventLoop.msgBufferPool)
try super.init(
socket: socket,
parent: parent,
Expand Down Expand Up @@ -607,24 +605,22 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
override func readFromSocket() throws -> ReadResult {
if self.vectorReadManager != nil {
return try self.vectorReadFromSocket()
} else if self.reportExplicitCongestionNotifications || self.receivePacketInfo {
let pooledMsgBuffer = self.selectableEventLoop.msgBufferPool.get()
defer { self.selectableEventLoop.msgBufferPool.put(pooledMsgBuffer) }
return try pooledMsgBuffer.withUnsafePointers { _, _, controlMessageStorage in
return try self.singleReadFromSocket(controlBytesBuffer: controlMessageStorage[0])
}
} else {
return try self.singleReadFromSocket()
return try self.singleReadFromSocket(controlBytesBuffer: UnsafeMutableRawBufferPointer(start: nil, count: 0))
}
}

private func singleReadFromSocket() throws -> ReadResult {
private func singleReadFromSocket(controlBytesBuffer: UnsafeMutableRawBufferPointer) throws -> ReadResult {
var rawAddress = sockaddr_storage()
var rawAddressLength = socklen_t(MemoryLayout<sockaddr_storage>.size)
var readResult = ReadResult.none

// These control bytes must not escape the current call stack
let controlBytesBuffer: UnsafeMutableRawBufferPointer
if self.reportExplicitCongestionNotifications || self.receivePacketInfo {
controlBytesBuffer = self.selectableEventLoop.controlMessageStorage[0]
} else {
controlBytesBuffer = UnsafeMutableRawBufferPointer(start: nil, count: 0)
}

for _ in 1...self.maxMessagesPerRead {
guard self.isOpen else {
throw ChannelError.eof
Expand Down Expand Up @@ -804,16 +800,17 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
scalarWriteOperation: { (ptr, destinationPtr, destinationSize, metadata) in
// normal write
// Control bytes must not escape current stack.
var controlBytes = UnsafeOutboundControlBytes(
controlBytes: self.selectableEventLoop.controlMessageStorage[0])
controlBytes.appendExplicitCongestionState(metadata: metadata,
protocolFamily: self.localAddress?.protocol)
return try self.socket.sendmsg(pointer: ptr,
destinationPtr: destinationPtr,
destinationSize: destinationSize,
controlBytes: controlBytes.validControlBytes)
let msgBuffer = self.selectableEventLoop.msgBufferPool.get()
defer { self.selectableEventLoop.msgBufferPool.put(msgBuffer) }
return try msgBuffer.withUnsafePointers { _, _, controlMessageStorage in
var controlBytes = UnsafeOutboundControlBytes(controlBytes: controlMessageStorage[0])
controlBytes.appendExplicitCongestionState(metadata: metadata,
protocolFamily: self.localAddress?.protocol)
return try self.socket.sendmsg(pointer: ptr,
destinationPtr: destinationPtr,
destinationSize: destinationSize,
controlBytes: controlBytes.validControlBytes)
}
},
vectorWriteOperation: { msgs in
return try self.socket.sendmmsg(msgs: msgs)
Expand All @@ -822,7 +819,6 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
return result
}


// MARK: Datagram Channel overrides not required by BaseSocketChannel

override func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
Expand Down
6 changes: 1 addition & 5 deletions Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows {
let bufferPool = Pool<PooledBuffer>(maxSize: 16)
let msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
defer {
controlMessageStorage.deallocate()
}
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool, controlMessageStorage: controlMessageStorage)
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool)

XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
Expand Down