Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pooled control message storage. #2422

Merged
merged 6 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions Sources/NIOPosix/ControlMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,37 @@ import CNIOWindows
/// Supports multiple messages each with enough storage for multiple `cmsghdr`
struct UnsafeControlMessageStorage: Collection {
let bytesPerMessage: Int
let deallocateBuffer: Bool
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
var buffer: UnsafeMutableRawBufferPointer

/// Initialise which includes allocating memory
/// parameter:
/// - bytesPerMessage: How many bytes have been allocated for each supported message.
/// - buffer: The memory allocated to use for control messages.
private init(bytesPerMessage: Int, buffer: UnsafeMutableRawBufferPointer) {
init(bytesPerMessage: Int, deallocateBuffer: Bool, buffer: UnsafeMutableRawBufferPointer) {
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
self.bytesPerMessage = bytesPerMessage
self.deallocateBuffer = deallocateBuffer
self.buffer = buffer
}

// Guess that 4 Int32 payload messages is enough for anyone.
static var bytesPerMessage: Int { NIOBSDSocketControlMessage.space(payloadSize: MemoryLayout<Int32>.stride) * 4 }

/// Allocate new memory - Caller must call `deallocate` when no longer required.
/// parameter:
/// - msghdrCount: How many `msghdr` structures will be fed from this buffer - we assume 4 Int32 cmsgs for each.
static func allocate(msghdrCount: Int) -> UnsafeControlMessageStorage {
// Guess that 4 Int32 payload messages is enough for anyone.
let bytesPerMessage = NIOBSDSocketControlMessage.space(payloadSize: MemoryLayout<Int32>.stride) * 4
let bytesPerMessage = Self.bytesPerMessage
let buffer = UnsafeMutableRawBufferPointer.allocate(byteCount: bytesPerMessage * msghdrCount,
alignment: MemoryLayout<cmsghdr>.alignment)
return UnsafeControlMessageStorage(bytesPerMessage: bytesPerMessage, buffer: buffer)
return UnsafeControlMessageStorage(bytesPerMessage: bytesPerMessage, deallocateBuffer: true, buffer: buffer)
}

mutating func deallocate() {
self.buffer.deallocate()
self.buffer = UnsafeMutableRawBufferPointer(start: UnsafeMutableRawPointer(bitPattern: 0x7eadbeef), count: 0)
if self.deallocateBuffer {
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
self.buffer.deallocate()
self.buffer = UnsafeMutableRawBufferPointer(start: UnsafeMutableRawPointer(bitPattern: 0x7eadbeef), count: 0)
}
}

/// Get the part of the buffer for use with a message.
Expand All @@ -65,7 +71,6 @@ struct UnsafeControlMessageStorage: Collection {
func index(after: Int) -> Int {
return after + 1
}

}

/// Representation of a `cmsghdr` and associated data.
Expand Down
12 changes: 4 additions & 8 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ final class PendingDatagramWritesManager: PendingWritesManager {

private let bufferPool: Pool<PooledBuffer>
private let msgBufferPool: Pool<PooledMsgBuffer>
private let controlMessageStorage: UnsafeControlMessageStorage

private var state = PendingDatagramWritesState()

Expand All @@ -400,13 +399,10 @@ final class PendingDatagramWritesManager: PendingWritesManager {
///
/// - parameters:
/// - bufferPool: a pool of buffers to be used for IOVector and storage references
/// - msgs: A pre-allocated array of `MMsgHdr` elements
/// - addresses: A pre-allocated array of `sockaddr_storage` elements
/// - controlMessageStorage: Pre-allocated memory for storing cmsghdr data during a vector write operation.
init(bufferPool: Pool<PooledBuffer>, msgBufferPool: Pool<PooledMsgBuffer>, controlMessageStorage: UnsafeControlMessageStorage) {
/// - msgBufferPool: a pool of buffers to be usded for `MMsgHdr`, `sockaddr_storage` and cmsghdr elements
init(bufferPool: Pool<PooledBuffer>, msgBufferPool: Pool<PooledMsgBuffer>) {
self.bufferPool = bufferPool
self.msgBufferPool = msgBufferPool
self.controlMessageStorage = controlMessageStorage
}

/// Mark the flush checkpoint.
Expand Down Expand Up @@ -610,12 +606,12 @@ final class PendingDatagramWritesManager: PendingWritesManager {
let msgBuffer = self.msgBufferPool.get()
defer { self.msgBufferPool.put(msgBuffer) }

return try msgBuffer.withUnsafePointers { msgs, addresses in
return try msgBuffer.withUnsafePointers { msgs, addresses, controlMessageStorage in
return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state,
bufferPool: self.bufferPool,
msgs: msgs,
addresses: addresses,
controlMessageStorage: self.controlMessageStorage,
controlMessageStorage: controlMessageStorage,
{ try vectorWriteOperation($0) }),
messages: msgs)
}
Expand Down
36 changes: 24 additions & 12 deletions Sources/NIOPosix/Pool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ struct PooledMsgBuffer: PoolElement {
let count: Int
let spaceForMsgHdrs: Int
let spaceForAddresses: Int
let spaceForControlData: Int

init(count: Int) {
var spaceForMsgHdrs = MemoryLayout<MMsgHdr>.stride * count
Expand All @@ -220,13 +221,17 @@ struct PooledMsgBuffer: PoolElement {
var spaceForAddress = MemoryLayout<sockaddr_storage>.stride * count
spaceForAddress.roundUpToAlignment(for: MemorySentinel.self)

var spaceForControlData = (UnsafeControlMessageStorage.bytesPerMessage * count)
spaceForControlData.roundUpToAlignment(for: cmsghdr.self)

self.count = count
self.spaceForMsgHdrs = spaceForMsgHdrs
self.spaceForAddresses = spaceForAddress
self.spaceForControlData = spaceForControlData
}

var totalByteCount: Int {
self.spaceForMsgHdrs + self.spaceForAddresses + MemoryLayout<MemorySentinel>.size
self.spaceForMsgHdrs + self.spaceForAddresses + self.spaceForControlData + MemoryLayout<MemorySentinel>.size
}

var msgHdrsOffset: Int {
Expand All @@ -237,8 +242,12 @@ struct PooledMsgBuffer: PoolElement {
self.spaceForMsgHdrs
}

var controlDataOffset: Int {
self.spaceForMsgHdrs + self.spaceForAddresses
}

var memorySentinelOffset: Int {
return self.spaceForMsgHdrs + self.spaceForAddresses
return self.spaceForMsgHdrs + self.spaceForAddresses + self.spaceForControlData
}
}

Expand All @@ -254,18 +263,20 @@ struct PooledMsgBuffer: PoolElement {
storage.withUnsafeMutablePointers { headPointer, tailPointer in
UnsafeRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).bindMemory(to: MMsgHdr.self, capacity: count)
UnsafeRawPointer(tailPointer + headPointer.pointee.addressesOffset).bindMemory(to: sockaddr_storage.self, capacity: count)
// space for control message data not needed to be bound
UnsafeRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).bindMemory(to: MemorySentinel.self, capacity: 1)
}

return storage
}

func withUnsafeMutableTypedPointers<ReturnType>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeMutablePointer<MemorySentinel>) throws -> ReturnType
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage, UnsafeMutablePointer<MemorySentinel>) throws -> ReturnType
) rethrows -> ReturnType {
return try self.withUnsafeMutablePointers { headPointer, tailPointer in
let msgHdrsPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.msgHdrsOffset).assumingMemoryBound(to: MMsgHdr.self)
let addressesPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.addressesOffset).assumingMemoryBound(to: sockaddr_storage.self)
let controlDataPointer = UnsafeMutableRawBufferPointer(start: tailPointer + headPointer.pointee.controlDataOffset, count: headPointer.pointee.spaceForControlData)
let sentinelPointer = UnsafeMutableRawPointer(tailPointer + headPointer.pointee.memorySentinelOffset).assumingMemoryBound(to: MemorySentinel.self)

let msgHdrsBufferPointer = UnsafeMutableBufferPointer(
Expand All @@ -274,13 +285,14 @@ struct PooledMsgBuffer: PoolElement {
let addressesBufferPointer = UnsafeMutableBufferPointer(
start: addressesPointer, count: headPointer.pointee.count
)
return try body(msgHdrsBufferPointer, addressesBufferPointer, sentinelPointer)
let controlMessageStorage = UnsafeControlMessageStorage(bytesPerMessage: UnsafeControlMessageStorage.bytesPerMessage, deallocateBuffer: false, buffer: controlDataPointer)
return try body(msgHdrsBufferPointer, addressesBufferPointer, controlMessageStorage, sentinelPointer)
}
}
}

private func validateSentinel() {
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
self.storage.withUnsafeMutableTypedPointers { _, _, _, sentinelPointer in
precondition(sentinelPointer.pointee == Self.sentinelValue, "Detected memory handling error!")
}
}
Expand All @@ -289,7 +301,7 @@ struct PooledMsgBuffer: PoolElement {

init() {
self.storage = .create(count: Socket.writevLimitIOVectors)
self.storage.withUnsafeMutableTypedPointers { _, _, sentinelPointer in
self.storage.withUnsafeMutableTypedPointers { _, _, _, sentinelPointer in
sentinelPointer.pointee = Self.sentinelValue
}
}
Expand All @@ -299,22 +311,22 @@ struct PooledMsgBuffer: PoolElement {
}

func withUnsafePointers<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>) throws -> ReturnValue
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage) throws -> ReturnValue
) rethrows -> ReturnValue {
defer {
self.validateSentinel()
}
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
return try body(msgs, addresses)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, controlMessageStorage, _ in
return try body(msgs, addresses, controlMessageStorage)
}
}

func withUnsafePointersWithStorageManagement<ReturnValue>(
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, Unmanaged<AnyObject>) throws -> ReturnValue
_ body: (UnsafeMutableBufferPointer<MMsgHdr>, UnsafeMutableBufferPointer<sockaddr_storage>, UnsafeControlMessageStorage, Unmanaged<AnyObject>) throws -> ReturnValue
) rethrows -> ReturnValue {
let storageRef: Unmanaged<AnyObject> = Unmanaged.passUnretained(self.storage)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, _ in
try body(msgs, addresses, storageRef)
return try self.storage.withUnsafeMutableTypedPointers { msgs, addresses, controlMessageStorage, _ in
try body(msgs, addresses, controlMessageStorage, storageRef)
}
}
}
2 changes: 1 addition & 1 deletion Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ Further information:
self.thread = thread
self.bufferPool = Pool<PooledBuffer>(maxSize: 16)
self.msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: 1)
ser-0xff marked this conversation as resolved.
Show resolved Hide resolved
// We will process 4096 tasks per while loop.
self.tasksCopy.reserveCapacity(4096)
self.canBeShutdownIndividually = canBeShutdownIndividually
Expand Down
6 changes: 2 additions & 4 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
}

self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
msgBufferPool: eventLoop.msgBufferPool)

try super.init(
socket: socket,
Expand All @@ -440,8 +439,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
self.vectorReadManager = nil
try socket.setNonBlocking()
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgBufferPool: eventLoop.msgBufferPool,
controlMessageStorage: eventLoop.controlMessageStorage)
msgBufferPool: eventLoop.msgBufferPool)
try super.init(
socket: socket,
parent: parent,
Expand Down
6 changes: 1 addition & 5 deletions Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows {
let bufferPool = Pool<PooledBuffer>(maxSize: 16)
let msgBufferPool = Pool<PooledMsgBuffer>(maxSize: 16)
var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
defer {
controlMessageStorage.deallocate()
}
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool, controlMessageStorage: controlMessageStorage)
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool, msgBufferPool: msgBufferPool)

XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
Expand Down