Skip to content

Commit

Permalink
Support for datagram (UDP, message) sockets
Browse files Browse the repository at this point in the history
Fixes: #128
  • Loading branch information
lhoward committed Nov 11, 2024
1 parent af1345a commit 5835cea
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 0 deletions.
47 changes: 47 additions & 0 deletions FlyingSocks/Sources/AsyncSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ public struct AsyncSocket: Sendable {
return buffer
}

public func receive(atMost length: Int = 4096) async throws -> (sockaddr_storage, [UInt8]) {
try Task.checkCancellation()

repeat {
do {
return try socket.receive(length: length)
} catch SocketError.blocked {
try await pool.suspendSocket(socket, untilReadyFor: .read)
} catch {
throw error
}
} while true
}

Check warning on line 144 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L132-L144

Added lines #L132 - L144 were not covered by tests

/// Reads bytes from the socket up to by not over/
/// - Parameter bytes: The max number of bytes to read
/// - Returns: an array of the read bytes capped to the number of bytes provided.
Expand Down Expand Up @@ -163,6 +177,19 @@ public struct AsyncSocket: Sendable {
}
}

public func send(_ data: [UInt8], to address: some SocketAddress) async throws {
let sent = try await pool.loopUntilReady(for: .write, on: socket) {
try socket.send(data, to: address)
}
guard sent == data.count else {
throw SocketError.disconnected
}
}

Check warning on line 187 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L180-L187

Added lines #L180 - L187 were not covered by tests

public func send(_ data: Data, to address: some SocketAddress) async throws {
try await send(Array(data), to: address)
}

Check warning on line 191 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L189-L191

Added lines #L189 - L191 were not covered by tests

public func close() throws {
try socket.close()
}
Expand All @@ -174,6 +201,10 @@ public struct AsyncSocket: Sendable {
public var sockets: AsyncSocketSequence {
AsyncSocketSequence(socket: self)
}

public var messages: AsyncSocketMessageSequence {
AsyncSocketMessageSequence(socket: self)
}

Check warning on line 207 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L205-L207

Added lines #L205 - L207 were not covered by tests
}

package extension AsyncSocket {
Expand Down Expand Up @@ -237,6 +268,22 @@ public struct AsyncSocketSequence: AsyncSequence, AsyncIteratorProtocol, Sendabl
}
}

public struct AsyncSocketMessageSequence: AsyncSequence, AsyncIteratorProtocol, Sendable {
public typealias Element = (sockaddr_storage, [UInt8])

let socket: AsyncSocket

public func makeAsyncIterator() -> AsyncSocketMessageSequence { self }

Check warning on line 276 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L276

Added line #L276 was not covered by tests

public mutating func next() async throws -> Element? {
return try await socket.receive()
}

Check warning on line 280 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L278-L280

Added lines #L278 - L280 were not covered by tests

public func nextBuffer(suggested count: Int) async throws -> Element? {
try await socket.receive(atMost: count)
}

Check warning on line 284 in FlyingSocks/Sources/AsyncSocket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/AsyncSocket.swift#L282-L284

Added lines #L282 - L284 were not covered by tests
}

private actor ClientPoolLoader {
static let shared = ClientPoolLoader()

Expand Down
9 changes: 9 additions & 0 deletions FlyingSocks/Sources/Socket+Android.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extension Socket.FileDescriptor {

extension Socket {
static let stream = Int32(SOCK_STREAM)
static let datagram = Int32(SOCK_DGRAM)
static let in_addr_any = Android.in_addr(s_addr: Android.in_addr_t(0))

static func makeAddressINET(port: UInt16) -> Android.sockaddr_in {
Expand Down Expand Up @@ -175,6 +176,14 @@ extension Socket {
static func pollfd(fd: FileDescriptorType, events: Int16, revents: Int16) -> Android.pollfd {
Android.pollfd(fd: fd, events: events, revents: revents)
}

static func recvfrom(_ fd: FileDescriptorType, _ buffer: UnsafeMutableRawPointer!, _ nbyte: Int, _ flags: Int32, _ addr: UnsafeMutablePointer<sockaddr>!, _ len: UnsafeMutablePointer<socklen_t>!) -> Int {
Android.recvfrom(fd, buffer, nbyte, flags, addr, len)
}

static func sendto(_ fd: FileDescriptorType, _ buffer: UnsafeRawPointer!, _ nbyte: Int, _ flags: Int32, _ destaddr: UnsafePointer<sockaddr>!, _ destlen: socklen_t) -> Int {
Android.sendto(fd, buffer, nbyte, flags, destaddr, destlen)
}
}

#endif
9 changes: 9 additions & 0 deletions FlyingSocks/Sources/Socket+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extension Socket.FileDescriptor {

extension Socket {
static let stream = Int32(SOCK_STREAM)
static let datagram = Int32(SOCK_DGRAM)
static let in_addr_any = Darwin.in_addr(s_addr: Darwin.in_addr_t(0))

static func makeAddressINET(port: UInt16) -> Darwin.sockaddr_in {
Expand Down Expand Up @@ -176,6 +177,14 @@ extension Socket {
static func pollfd(fd: FileDescriptorType, events: Int16, revents: Int16) -> Darwin.pollfd {
Darwin.pollfd(fd: fd, events: events, revents: revents)
}

static func recvfrom(_ fd: FileDescriptorType, _ buffer: UnsafeMutableRawPointer!, _ nbyte: Int, _ flags: Int32, _ addr: UnsafeMutablePointer<sockaddr>!, _ len: UnsafeMutablePointer<socklen_t>!) -> Int {
Darwin.recvfrom(fd, buffer, nbyte, flags, addr, len)
}

Check warning on line 183 in FlyingSocks/Sources/Socket+Darwin.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket+Darwin.swift#L181-L183

Added lines #L181 - L183 were not covered by tests

static func sendto(_ fd: FileDescriptorType, _ buffer: UnsafeRawPointer!, _ nbyte: Int, _ flags: Int32, _ destaddr: UnsafePointer<sockaddr>!, _ destlen: socklen_t) -> Int {
Darwin.sendto(fd, buffer, nbyte, flags, destaddr, destlen)
}

Check warning on line 187 in FlyingSocks/Sources/Socket+Darwin.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket+Darwin.swift#L185-L187

Added lines #L185 - L187 were not covered by tests
}

#endif
9 changes: 9 additions & 0 deletions FlyingSocks/Sources/Socket+Glibc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extension Socket.FileDescriptor {

extension Socket {
static let stream = Int32(SOCK_STREAM.rawValue)
static let datagram = Int32(SOCK_DGRAM.rawValue)
static let in_addr_any = Glibc.in_addr(s_addr: Glibc.in_addr_t(0))

static func makeAddressINET(port: UInt16) -> Glibc.sockaddr_in {
Expand Down Expand Up @@ -172,6 +173,14 @@ extension Socket {
static func pollfd(fd: FileDescriptorType, events: Int16, revents: Int16) -> Glibc.pollfd {
Glibc.pollfd(fd: fd, events: events, revents: revents)
}

static func recvfrom(_ fd: FileDescriptorType, _ buffer: UnsafeMutableRawPointer!, _ nbyte: Int, _ flags: Int32, _ addr: UnsafeMutablePointer<sockaddr>!, _ len: UnsafeMutablePointer<socklen_t>!) -> Int {
Glibc.recvfrom(fd, buffer, nbyte, flags, addr, len)
}

static func sendto(_ fd: FileDescriptorType, _ buffer: UnsafeRawPointer!, _ nbyte: Int, _ flags: Int32, _ destaddr: UnsafePointer<sockaddr>!, _ destlen: socklen_t) -> Int {
Glibc.sendto(fd, buffer, nbyte, flags, destaddr, destlen)
}
}

#endif
9 changes: 9 additions & 0 deletions FlyingSocks/Sources/Socket+Musl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extension Socket.FileDescriptor {

extension Socket {
static let stream = Int32(SOCK_STREAM)
static let datagram = Int32(SOCK_DGRAM)
static let in_addr_any = Musl.in_addr(s_addr: Musl.in_addr_t(0))

static func makeAddressINET(port: UInt16) -> Musl.sockaddr_in {
Expand Down Expand Up @@ -172,6 +173,14 @@ extension Socket {
static func pollfd(fd: FileDescriptorType, events: Int16, revents: Int16) -> Musl.pollfd {
Musl.pollfd(fd: fd, events: events, revents: revents)
}

static func recvfrom(_ fd: FileDescriptorType, _ buffer: UnsafeMutableRawPointer!, _ nbyte: Int, _ flags: Int32, _ addr: UnsafeMutablePointer<sockaddr>!, _ len: UnsafeMutablePointer<socklen_t>!) -> Int {
Musl.recvfrom(fd, buffer, nbyte, flags, addr, len)
}

static func sendto(_ fd: FileDescriptorType, _ buffer: UnsafeRawPointer!, _ nbyte: Int, _ flags: Int32, _ destaddr: UnsafePointer<sockaddr>!, _ destlen: socklen_t) -> Int {
Musl.sendto(fd, buffer, nbyte, flags, destaddr, destlen)
}
}

#endif
9 changes: 9 additions & 0 deletions FlyingSocks/Sources/Socket+WinSock2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ extension Socket.FileDescriptor {

extension Socket {
static let stream = Int32(SOCK_STREAM)
static let datagram = Int32(SOCK_DGRAM)
static let in_addr_any = WinSDK.in_addr()

static func makeAddressINET(port: UInt16) -> WinSDK.sockaddr_in {
Expand Down Expand Up @@ -184,6 +185,14 @@ extension Socket {
static func pollfd(fd: FileDescriptorType, events: Int16, revents: Int16) -> WinSDK.WSAPOLLFD {
WinSDK.WSAPOLLFD(fd: fd, events: events, revents: revents)
}

static func recvfrom(_ fd: FileDescriptorType, _ buffer: UnsafeMutableRawPointer!, _ nbyte: Int, _ flags: Int32, _ addr: UnsafeMutablePointer<sockaddr>!, _ len: UnsafeMutablePointer<socklen_t>!) -> Int {
WinSDK.recvfrom(fd, buffer, nbyte, flags, addr, len)
}

static func sendto(_ fd: FileDescriptorType, _ buffer: UnsafeRawPointer!, _ nbyte: Int, _ flags: Int32, _ destaddr: UnsafePointer<sockaddr>!, _ destlen: socklen_t) -> Int {
WinSDK.sendto(fd, buffer, nbyte, flags, destaddr, destlen)
}
}

#endif
52 changes: 52 additions & 0 deletions FlyingSocks/Sources/Socket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,35 @@ public struct Socket: Sendable, Hashable {
return count
}

public func receive(length: Int) throws -> (sockaddr_storage, [UInt8]) {
var address: sockaddr_storage?
let bytes = try [UInt8](unsafeUninitializedCapacity: length) { buffer, count in
(address, count) = try receive(into: buffer.baseAddress!, length: length)
}

return (address!, bytes)
}

Check warning on line 224 in FlyingSocks/Sources/Socket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket.swift#L217-L224

Added lines #L217 - L224 were not covered by tests

private func receive(into buffer: UnsafeMutablePointer<UInt8>, length: Int) throws -> (sockaddr_storage, Int) {
var addr = sockaddr_storage()
var size = socklen_t(MemoryLayout<sockaddr_storage>.size)
let count = withUnsafeMutablePointer(to: &addr) {
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
Socket.recvfrom(file.rawValue, buffer, length, 0, $0, &size)
}
}
guard count > 0 else {
if errno == EWOULDBLOCK {
throw SocketError.blocked
} else if errno == EBADF || count == 0 {
throw SocketError.disconnected
} else {
throw SocketError.makeFailed("RecvFrom")
}
}
return (addr, count)
}

Check warning on line 244 in FlyingSocks/Sources/Socket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket.swift#L226-L244

Added lines #L226 - L244 were not covered by tests

public func write(_ data: Data, from index: Data.Index = 0) throws -> Data.Index {
precondition(index >= 0)
guard index < data.endIndex else { return data.endIndex }
Expand All @@ -237,6 +266,29 @@ public struct Socket: Sendable, Hashable {
return sent
}

public func send(_ bytes: [UInt8], to address: some SocketAddress) throws -> Int {
try bytes.withUnsafeBytes { buffer in
try send(buffer.baseAddress!, length: bytes.count, to: address)
}
}

Check warning on line 273 in FlyingSocks/Sources/Socket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket.swift#L269-L273

Added lines #L269 - L273 were not covered by tests

private func send<A: SocketAddress>(_ pointer: UnsafeRawPointer, length: Int, to address: A) throws -> Int {
var addr = address
let sent = withUnsafePointer(to: &addr) {
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
Socket.sendto(file.rawValue, pointer, length, 0, $0, socklen_t(MemoryLayout<A>.size))
}
}
guard sent >= 0 || errno == EISCONN else {
if errno == EINPROGRESS {
throw SocketError.blocked
} else {
throw SocketError.makeFailed("SendTo")
}
}
return sent
}

Check warning on line 290 in FlyingSocks/Sources/Socket.swift

View check run for this annotation

Codecov / codecov/patch

FlyingSocks/Sources/Socket.swift#L275-L290

Added lines #L275 - L290 were not covered by tests

public func close() throws {
if Socket.close(file.rawValue) == -1 {
throw SocketError.makeFailed("Close")
Expand Down

0 comments on commit 5835cea

Please sign in to comment.