Skip to content

Commit 2e50c68

Browse files
committed
progress
1 parent 9c080d1 commit 2e50c68

File tree

4 files changed

+42
-9
lines changed

4 files changed

+42
-9
lines changed

Sources/NIOPosix/BaseStreamSocketChannel.swift

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,23 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
162162

163163
final override func writeToSocket() throws -> OverallWriteResult {
164164
print(#fileID, #function)
165-
// TODO: self.socket.sendmmsg(msgs: )
166-
// TODO: self.socket.sendmsg(pointer: , destinationPtr: UnsafePointer<sockaddr>?, destinationSize: socklen_t, controlBytes: UnsafeMutableRawBufferPointer)
167165
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
166+
writeMessage: { ptr, destinationPtr, destinationSize, controlBytes in
167+
fatalError("writeMessage")
168+
guard let ctrlBytes = controlBytes else {
169+
return .processed(0)
170+
}
171+
var s = UnsafeControlMessageStorage.allocate(msghdrCount: 1)
172+
var cb = UnsafeOutboundControlBytes(controlBytes: s[0])
173+
cb.appendExplicitCongestionState(metadata: ctrlBytes, protocolFamily: .unix)
174+
let controlMessageBytePointer = cb.validControlBytes
175+
return try self.socket.sendmsg(
176+
pointer: ptr,
177+
destinationPtr: destinationPtr,
178+
destinationSize: destinationSize,
179+
controlBytes: controlMessageBytePointer
180+
)
181+
},
168182
scalarBufferWriteOperation: { ptr in
169183
guard ptr.count > 0 else {
170184
// No need to call write if the buffer is empty.

Sources/NIOPosix/PendingDatagramWritesManager.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ private struct PendingDatagramWritesState {
365365
case .some(let e):
366366
// The compiler can't prove this, but it must be so.
367367
assert(self.pendingWrites.distance(from: e, to: self.pendingWrites.startIndex) == 0)
368-
return .scalarBufferWrite
368+
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata == nil)
369369
default:
370370
return .nothingToBeWritten
371371
}

Sources/NIOPosix/PendingWritesManager.swift

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ private struct PendingStreamWritesState {
302302
case 1:
303303
switch self.pendingWrites.first!.data {
304304
case .byteBuffer:
305-
return .scalarBufferWrite
305+
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil)
306306
case .fileRegion:
307307
return .scalarFileWrite
308308
}
@@ -315,7 +315,7 @@ private struct PendingStreamWritesState {
315315
case (.byteBuffer, .byteBuffer):
316316
return .vectorBufferWrite
317317
case (.byteBuffer, .fileRegion):
318-
return .scalarBufferWrite
318+
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil)
319319
case (.fileRegion, _):
320320
return .scalarFileWrite
321321
}
@@ -387,6 +387,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
387387

388388
func add(envelope: AddressedEnvelope<ByteBuffer>, promise: EventLoopPromise<Void>?) -> Bool {
389389
assert(self.isOpen)
390+
print(#function, envelope.metadata)
390391
self.state.append(
391392
PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata)
392393
)
@@ -407,16 +408,22 @@ final class PendingStreamWritesManager: PendingWritesManager {
407408
/// - scalarFileWriteOperation: An operation that writes a region of a file descriptor (usually `sendfile`).
408409
/// - Returns: The `OneWriteOperationResult` and whether the `Channel` is now writable.
409410
func triggerAppropriateWriteOperations(
411+
writeMessage: (
412+
UnsafeRawBufferPointer, UnsafePointer<sockaddr>?, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?
413+
) throws -> IOResult<Int>,
410414
scalarBufferWriteOperation: (UnsafeRawBufferPointer) throws -> IOResult<Int>,
411415
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>,
412416
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>
413417
) throws -> OverallWriteResult {
414418
try self.triggerWriteOperations { writeMechanism in
415419
print(#function, writeMechanism)
416-
// TODO: add with metadata calls.
417420
switch writeMechanism {
418-
case .scalarBufferWrite:
419-
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
421+
case .scalarBufferWrite(let metaData):
422+
if metaData {
423+
return try _triggerScalarBufferWrite(scalarWriteOperation: { try writeMessage($0, $1, $2, $3) })
424+
} else {
425+
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
426+
}
420427
case .vectorBufferWrite:
421428
return try triggerVectorBufferWrite({ try vectorBufferWriteOperation($0) })
422429
case .scalarFileWrite:
@@ -465,6 +472,15 @@ final class PendingStreamWritesManager: PendingWritesManager {
465472
}
466473
}
467474

475+
private func _triggerScalarBufferWrite(
476+
scalarWriteOperation: (
477+
UnsafeRawBufferPointer, UnsafePointer<sockaddr>?, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?
478+
) throws -> IOResult<Int>
479+
) rethrows -> OneWriteOperationResult {
480+
fatalError("\(#function) We made it")
481+
self.didWrite(itemCount: 1, result: .processed(1))
482+
}
483+
468484
/// Trigger a write of a single `FileRegion` (usually using `sendfile(2)`).
469485
///
470486
/// - Parameters:
@@ -621,7 +637,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
621637
}
622638

623639
internal enum WriteMechanism {
624-
case scalarBufferWrite
640+
case scalarBufferWrite(withMetaData: Bool)
625641
case vectorBufferWrite
626642
case scalarFileWrite
627643
case nothingToBeWritten

Tests/NIOPosixTests/ChannelTests.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ final class ChannelTests: XCTestCase {
293293
var multiState = 0
294294
var fileState = 0
295295
let result = try pwm.triggerAppropriateWriteOperations(
296+
writeMessage: { (_, _, _, _) in
297+
return .processed(0)
298+
},
296299
scalarBufferWriteOperation: { buf in
297300
defer {
298301
singleState += 1

0 commit comments

Comments
 (0)