@@ -62,6 +62,25 @@ public extension AsyncSocketPool where Self == SocketPool<Poll> {
62
62
63
63
public struct AsyncSocket : Sendable {
64
64
65
+ public struct Message : Sendable {
66
+ public let peerAddress : any SocketAddress
67
+ public let bytes : [ UInt8 ]
68
+ public let interfaceIndex : UInt32 ?
69
+ public let localAddress : ( any SocketAddress ) ?
70
+
71
+ public init (
72
+ peerAddress: any SocketAddress ,
73
+ bytes: [ UInt8 ] ,
74
+ interfaceIndex: UInt32 ? = nil ,
75
+ localAddress: ( any SocketAddress ) ? = nil
76
+ ) {
77
+ self . peerAddress = peerAddress
78
+ self . bytes = bytes
79
+ self . interfaceIndex = interfaceIndex
80
+ self . localAddress = localAddress
81
+ }
82
+ }
83
+
65
84
public let socket : Socket
66
85
let pool : any AsyncSocketPool
67
86
@@ -83,7 +102,7 @@ public struct AsyncSocket: Sendable {
83
102
pool: some AsyncSocketPool ,
84
103
timeout: TimeInterval = 5 ) async throws -> Self {
85
104
try await withThrowingTimeout ( seconds: timeout) {
86
- let socket = try Socket ( domain: Int32 ( type ( of: address) . family) , type: Socket . stream)
105
+ let socket = try Socket ( domain: Int32 ( type ( of: address) . family) , type: . stream)
87
106
let asyncSocket = try AsyncSocket ( socket: socket, pool: pool)
88
107
try await asyncSocket. connect ( to: address)
89
108
return asyncSocket
@@ -129,6 +148,37 @@ public struct AsyncSocket: Sendable {
129
148
return buffer
130
149
}
131
150
151
+ public func receive( atMost length: Int = 4096 ) async throws -> ( any SocketAddress , [ UInt8 ] ) {
152
+ try Task . checkCancellation ( )
153
+
154
+ repeat {
155
+ do {
156
+ return try socket. receive ( length: length)
157
+ } catch SocketError . blocked {
158
+ try await pool. suspendSocket ( socket, untilReadyFor: . read)
159
+ } catch {
160
+ throw error
161
+ }
162
+ } while true
163
+ }
164
+
165
+ #if !canImport(WinSDK)
166
+ public func receive( atMost length: Int ) async throws -> Message {
167
+ try Task . checkCancellation ( )
168
+
169
+ repeat {
170
+ do {
171
+ let ( peerAddress, bytes, interfaceIndex, localAddress) = try socket. receive ( length: length)
172
+ return Message ( peerAddress: peerAddress, bytes: bytes, interfaceIndex: interfaceIndex, localAddress: localAddress)
173
+ } catch SocketError . blocked {
174
+ try await pool. suspendSocket ( socket, untilReadyFor: . read)
175
+ } catch {
176
+ throw error
177
+ }
178
+ } while true
179
+ }
180
+ #endif
181
+
132
182
/// Reads bytes from the socket up to by not over/
133
183
/// - Parameter bytes: The max number of bytes to read
134
184
/// - Returns: an array of the read bytes capped to the number of bytes provided.
@@ -163,6 +213,61 @@ public struct AsyncSocket: Sendable {
163
213
}
164
214
}
165
215
216
+ public func send( _ data: [ UInt8 ] , to address: some SocketAddress ) async throws {
217
+ let sent = try await pool. loopUntilReady ( for: . write, on: socket) {
218
+ try socket. send ( data, to: address)
219
+ }
220
+ guard sent == data. count else {
221
+ throw SocketError . disconnected
222
+ }
223
+ }
224
+
225
+ public func send( _ data: Data , to address: some SocketAddress ) async throws {
226
+ try await send ( Array ( data) , to: address)
227
+ }
228
+
229
+ #if !canImport(WinSDK)
230
+ public func send(
231
+ message: [ UInt8 ] ,
232
+ to peerAddress: some SocketAddress ,
233
+ interfaceIndex: UInt32 ? = nil ,
234
+ from localAddress: ( some SocketAddress ) ? = nil
235
+ ) async throws {
236
+ let sent = try await pool. loopUntilReady ( for: . write, on: socket) {
237
+ try socket. send ( message: message, to: peerAddress, interfaceIndex: interfaceIndex, from: localAddress)
238
+ }
239
+ guard sent == message. count else {
240
+ throw SocketError . disconnected
241
+ }
242
+ }
243
+
244
+ public func send(
245
+ message: Data ,
246
+ to peerAddress: some SocketAddress ,
247
+ interfaceIndex: UInt32 ? = nil ,
248
+ from localAddress: ( some SocketAddress ) ? = nil
249
+ ) async throws {
250
+ try await send ( message: Array ( message) , to: peerAddress, interfaceIndex: interfaceIndex, from: localAddress)
251
+ }
252
+
253
+ public func send( message: Message ) async throws {
254
+ let localAddress : AnySocketAddress ?
255
+
256
+ if let unwrappedLocalAddress = message. localAddress {
257
+ localAddress = AnySocketAddress ( unwrappedLocalAddress)
258
+ } else {
259
+ localAddress = nil
260
+ }
261
+
262
+ try await send (
263
+ message: message. bytes,
264
+ to: AnySocketAddress ( message. peerAddress) ,
265
+ interfaceIndex: message. interfaceIndex,
266
+ from: localAddress
267
+ )
268
+ }
269
+ #endif
270
+
166
271
public func close( ) throws {
167
272
try socket. close ( )
168
273
}
@@ -174,12 +279,20 @@ public struct AsyncSocket: Sendable {
174
279
public var sockets : AsyncSocketSequence {
175
280
AsyncSocketSequence ( socket: self )
176
281
}
282
+
283
+ public var messages : AsyncSocketMessageSequence {
284
+ AsyncSocketMessageSequence ( socket: self )
285
+ }
286
+
287
+ public func messages( maxMessageLength: Int ) -> AsyncSocketMessageSequence {
288
+ AsyncSocketMessageSequence ( socket: self , maxMessageLength: maxMessageLength)
289
+ }
177
290
}
178
291
179
292
package extension AsyncSocket {
180
293
181
- static func makePair( pool: some AsyncSocketPool ) throws -> ( AsyncSocket , AsyncSocket ) {
182
- let ( s1, s2) = try Socket . makePair ( )
294
+ static func makePair( pool: some AsyncSocketPool , type : SocketType = . stream ) throws -> ( AsyncSocket , AsyncSocket ) {
295
+ let ( s1, s2) = try Socket . makePair ( type : type )
183
296
let a1 = try AsyncSocket ( socket: s1, pool: pool)
184
297
let a2 = try AsyncSocket ( socket: s2, pool: pool)
185
298
return ( a1, a2)
@@ -237,6 +350,35 @@ public struct AsyncSocketSequence: AsyncSequence, AsyncIteratorProtocol, Sendabl
237
350
}
238
351
}
239
352
353
+ public struct AsyncSocketMessageSequence : AsyncSequence , AsyncIteratorProtocol , Sendable {
354
+ public static let DefaultMaxMessageLength : Int = 1500
355
+
356
+ // Windows has a different recvmsg() API signature which is presently unsupported
357
+ public typealias Element = AsyncSocket . Message
358
+
359
+ private let socket : AsyncSocket
360
+ private let maxMessageLength : Int
361
+
362
+ public func makeAsyncIterator( ) -> AsyncSocketMessageSequence { self }
363
+
364
+ init ( socket: AsyncSocket , maxMessageLength: Int = Self . DefaultMaxMessageLength) {
365
+ self . socket = socket
366
+ self . maxMessageLength = maxMessageLength
367
+ }
368
+
369
+ public mutating func next( ) async throws -> Element ? {
370
+ #if !canImport(WinSDK)
371
+ try await socket. receive ( atMost: maxMessageLength)
372
+ #else
373
+ let peerAddress : any SocketAddress
374
+ let bytes : [ UInt8 ]
375
+
376
+ ( peerAddress, bytes) = try await socket. receive ( atMost: maxMessageLength)
377
+ return AsyncSocket . Message ( peerAddress: peerAddress, bytes: bytes)
378
+ #endif
379
+ }
380
+ }
381
+
240
382
private actor ClientPoolLoader {
241
383
static let shared = ClientPoolLoader ( )
242
384
0 commit comments