Skip to content

Commit 9c080d1

Browse files
committed
Add AddressedEnvelope to PendingWritesManager.
1 parent 28a2333 commit 9c080d1

File tree

3 files changed

+32
-34
lines changed

3 files changed

+32
-34
lines changed

Sources/NIOCore/AddressedEnvelope.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,24 @@ public struct AddressedEnvelope<DataType> {
3939
/// Details of any congestion state.
4040
public var ecnState: NIOExplicitCongestionNotificationState
4141
public var packetInfo: NIOPacketInfo?
42+
public var fd: Int?
4243

4344
public init(ecnState: NIOExplicitCongestionNotificationState) {
4445
self.ecnState = ecnState
4546
self.packetInfo = nil
47+
self.fd = nil
4648
}
4749

4850
public init(ecnState: NIOExplicitCongestionNotificationState, packetInfo: NIOPacketInfo?) {
4951
self.ecnState = ecnState
5052
self.packetInfo = packetInfo
53+
self.fd = nil
54+
}
55+
56+
public init(ecnState: NIOExplicitCongestionNotificationState, fd: Int?) {
57+
self.ecnState = ecnState
58+
self.packetInfo = nil
59+
self.fd = fd
5160
}
5261
}
5362
}

Sources/NIOPosix/BaseStreamSocketChannel.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
161161
}
162162

163163
final override func writeToSocket() throws -> OverallWriteResult {
164+
print(#fileID, #function)
165+
// TODO: self.socket.sendmmsg(msgs: )
166+
// TODO: self.socket.sendmsg(pointer: , destinationPtr: UnsafePointer<sockaddr>?, destinationSize: socklen_t, controlBytes: UnsafeMutableRawBufferPointer)
164167
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
165168
scalarBufferWriteOperation: { ptr in
166169
guard ptr.count > 0 else {
@@ -294,6 +297,13 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
294297
return
295298
}
296299

300+
if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope<ByteBuffer>.self) {
301+
if self.pendingWrites.add(envelope: envelope, promise: promise) {
302+
self.pipeline.syncOperations.fireChannelWritabilityChanged()
303+
}
304+
return
305+
}
306+
297307
let data = self.unwrapData(data, as: IOData.self)
298308

299309
if !self.pendingWrites.add(data: data, promise: promise) {

Sources/NIOPosix/PendingWritesManager.swift

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,43 +16,10 @@ import Atomics
1616
import CNIOLinux
1717
import NIOCore
1818

19-
public struct MetadataEnvelope {
20-
public var data: ByteBuffer
21-
public var controlData: MessageMetadata? = nil
22-
23-
public init(data: ByteBuffer) {
24-
self.data = data
25-
}
26-
27-
public init(data: ByteBuffer, controlData: MessageMetadata?) {
28-
self.data = data
29-
self.controlData = controlData
30-
}
31-
32-
/// Any metadata associated with an
33-
public struct MessageMetadata: Hashable, Sendable {
34-
init() {
35-
var m: msghdr = .init()
36-
// Create buffer for file descriptor
37-
let fake_fd = 6666
38-
let fd_pointer = UnsafeMutableRawPointer(bitPattern: fake_fd)
39-
m.msg_control = fd_pointer
40-
m.msg_controllen = MemoryLayout.size(ofValue: fake_fd)
41-
// TODO other platoforms
42-
let cm: UnsafeMutablePointer<cmsghdr> = CNIOLinux_CMSG_FIRSTHDR(&m)
43-
cm.pointee.cmsg_level = SOL_SOCKET
44-
cm.pointee.cmsg_type = Int32(SCM_RIGHTS)
45-
cm.pointee.cmsg_len = MemoryLayout.size(ofValue: fake_fd)
46-
47-
// TODO send `m`
48-
}
49-
}
50-
}
51-
5219
private struct PendingStreamWrite {
5320
var data: IOData
5421
var promise: Optional<EventLoopPromise<Void>>
55-
var metadata: MetadataEnvelope?
22+
var metadata: AddressedEnvelope<ByteBuffer>.Metadata?
5623
}
5724

5825
/// Write result is `.couldNotWriteEverything` but we have no more writes to perform.
@@ -404,7 +371,10 @@ final class PendingStreamWritesManager: PendingWritesManager {
404371
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
405372
assert(self.isOpen)
406373
self.state.append(PendingStreamWrite(data: data, promise: promise))
374+
return _add()
375+
}
407376

377+
private func _add() -> Bool {
408378
if self.state.bytes > waterMark.high
409379
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged
410380
{
@@ -415,6 +385,14 @@ final class PendingStreamWritesManager: PendingWritesManager {
415385
return true
416386
}
417387

388+
func add(envelope: AddressedEnvelope<ByteBuffer>, promise: EventLoopPromise<Void>?) -> Bool {
389+
assert(self.isOpen)
390+
self.state.append(
391+
PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata)
392+
)
393+
return _add()
394+
}
395+
418396
/// Returns the best mechanism to write pending data at the current point in time.
419397
var currentBestWriteMechanism: WriteMechanism {
420398
self.state.currentBestWriteMechanism
@@ -434,6 +412,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
434412
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>
435413
) throws -> OverallWriteResult {
436414
try self.triggerWriteOperations { writeMechanism in
415+
print(#function, writeMechanism)
437416
// TODO: add with metadata calls.
438417
switch writeMechanism {
439418
case .scalarBufferWrite:

0 commit comments

Comments
 (0)