Skip to content

Commit

Permalink
support for initializing off the system file descriptors (#285)
Browse files Browse the repository at this point in the history
Motivation:

In some contexts it is important to be able to pass file descriptors
around instead of specifying the particular host and port. This allows removing
certain windows for race conditions, and has positive security implications.

Modifications:

Added `withConnectedSocket(_:)` call to the `ClientBootstrap` that can
be used to create a new `Channel` out of existing file descriptor
representing the connected socket.

Result:

Allows initializing `Channel`s off the existing file descriptors.
  • Loading branch information
vlm authored and normanmaurer committed Apr 19, 2018
1 parent 2bce766 commit 338cfb5
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 55 deletions.
8 changes: 8 additions & 0 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,14 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

public final func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
assert(self.eventLoop.inEventLoop)
assert(self.isOpen)
assert(!self.lifecycleManager.isActive)
register0(promise: nil)
becomeActive0(promise: promise)
}

public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
promise?.succeed(result: ())
}
Expand Down
179 changes: 126 additions & 53 deletions Sources/NIO/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

/// A `ServerBootstrap` is an easy way to bootstrap a `ServerChannel` when creating network servers.
/// A `ServerBootstrap` is an easy way to bootstrap a `ServerSocketChannel` when creating network servers.
///
/// Example:
///
Expand Down Expand Up @@ -63,7 +63,7 @@ public final class ServerBootstrap {
/// Create a `ServerBootstrap` for the `EventLoopGroup` `group`.
///
/// - parameters:
/// - group: The `EventLoopGroup` to use for the `ServerChannel`.
/// - group: The `EventLoopGroup` to use for the `ServerSocketChannel`.
public convenience init(group: EventLoopGroup) {
self.init(group: group, childGroup: group)
}
Expand Down Expand Up @@ -132,12 +132,8 @@ public final class ServerBootstrap {
/// - host: The host to bind on.
/// - port: The port to bind on.
public func bind(host: String, port: Int) -> EventLoopFuture<Channel> {
let evGroup = group
do {
let address = try SocketAddress.newAddressResolving(host: host, port: port)
return bind0(eventLoopGroup: evGroup, to: address)
} catch let err {
return evGroup.next().newFailedFuture(error: err)
return bind0 {
return try SocketAddress.newAddressResolving(host: host, port: port)
}
}

Expand All @@ -146,54 +142,82 @@ public final class ServerBootstrap {
/// - parameters:
/// - address: The `SocketAddress` to bind on.
public func bind(to address: SocketAddress) -> EventLoopFuture<Channel> {
return bind0(eventLoopGroup: group, to: address)
return bind0 { address }
}

/// Bind the `ServerSocketChannel` to a UNIX Domain Socket.
///
/// - parameters:
/// - unixDomainSocketPath: The _Unix domain socket_ path to bind to. `unixDomainSocketPath` must not exist, it will be created by the system.
public func bind(unixDomainSocketPath: String) -> EventLoopFuture<Channel> {
let evGroup = group
return bind0 {
try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
}
}

/// Use the existing bound socket file descriptor.
///
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the bound stream socket.
public func withBoundSocket(descriptor: CInt) -> EventLoopFuture<Channel> {
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup) throws -> ServerSocketChannel {
return try ServerSocketChannel(descriptor: descriptor, eventLoop: eventLoop, group: childEventLoopGroup)
}
return bind0(makeServerChannel: makeChannel) { (eventLoop, serverChannel) in
let promise: EventLoopPromise<Void> = eventLoop.newPromise()
serverChannel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}
}

private func bind0(_ makeSocketAddress: () throws -> SocketAddress) -> EventLoopFuture<Channel> {
let address: SocketAddress
do {
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
return bind0(eventLoopGroup: evGroup, to: address)
} catch let err {
return evGroup.next().newFailedFuture(error: err)
address = try makeSocketAddress()
} catch {
return group.next().newFailedFuture(error: error)
}
func makeChannel(_ eventLoop: SelectableEventLoop, _ childEventLoopGroup: EventLoopGroup) throws -> ServerSocketChannel {
return try ServerSocketChannel(eventLoop: eventLoop,
group: childEventLoopGroup,
protocolFamily: address.protocolFamily)
}
return bind0(makeServerChannel: makeChannel) { (eventGroup, serverChannel) in
serverChannel.registerAndDoSynchronously { serverChannel in
serverChannel.bind(to: address)
}
}
}

private func bind0(eventLoopGroup: EventLoopGroup, to address: SocketAddress) -> EventLoopFuture<Channel> {
private func bind0(makeServerChannel: (_ eventLoop: SelectableEventLoop, _ childGroup: EventLoopGroup) throws -> ServerSocketChannel, _ register: @escaping (EventLoop, ServerSocketChannel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next()
let childEventLoopGroup = self.childGroup
let serverChannelOptions = self.serverChannelOptions
let eventLoop = eventLoopGroup.next()
let serverChannelInit = self.serverChannelInit ?? { _ in eventLoop.newSucceededFuture(result: ()) }
let childChannelInit = self.childChannelInit
let childChannelOptions = self.childChannelOptions

let promise: EventLoopPromise<Channel> = eventLoop.newPromise()
let future: EventLoopFuture<Channel>
do {
let serverChannel = try ServerSocketChannel(eventLoop: eventLoop as! SelectableEventLoop,
group: childEventLoopGroup,
protocolFamily: address.protocolFamily)
let serverChannel = try makeServerChannel(eventLoop as! SelectableEventLoop, childEventLoopGroup)

serverChannelInit(serverChannel).then {
future = serverChannelInit(serverChannel).then {
serverChannel.pipeline.add(handler: AcceptHandler(childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions))
}.then {
serverChannelOptions.applyAll(channel: serverChannel)
}.then {
serverChannel.registerAndDoSynchronously { serverChannel in
serverChannel.bind(to: address)
}
register(eventLoop, serverChannel)
}.map {
serverChannel
}.cascade(promise: promise)
} catch let err {
promise.fail(error: err)
}
} catch {
let promise: EventLoopPromise<Channel> = eventLoop.newPromise()
promise.fail(error: error)
future = promise.futureResult
}

return promise.futureResult
return future
}

private class AcceptHandler: ChannelInboundHandler {
Expand Down Expand Up @@ -388,6 +412,31 @@ public final class ClientBootstrap {
}
}

/// Use the existing connected socket file descriptor.
///
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the connected stream socket.
/// - returns: an `EventLoopFuture<Channel>` to deliver the `Channel` immediately.
public func withConnectedSocket(descriptor: CInt) -> EventLoopFuture<Channel> {
let eventLoop = group.next()
do {
let channelInitializer = self.channelInitializer ?? { _ in eventLoop.newSucceededFuture(result: ()) }
let channel = try SocketChannel(eventLoop: eventLoop as! SelectableEventLoop, descriptor: descriptor)

return channelInitializer(channel).then {
self.channelOptions.applyAll(channel: channel)
}.then {
let promise: EventLoopPromise<Void> = eventLoop.newPromise()
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}.map {
channel
}
} catch {
return eventLoop.newFailedFuture(error: error)
}
}

private func execute(eventLoop: EventLoop,
protocolFamily: Int32,
_ body: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
Expand Down Expand Up @@ -470,18 +519,29 @@ public final class DatagramBootstrap {
return self
}

/// Use the existing bound socket file descriptor.
///
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the bound datagram socket.
public func withBoundSocket(descriptor: CInt) -> EventLoopFuture<Channel> {
func makeChannel(_ eventLoop: SelectableEventLoop) throws -> DatagramChannel {
return try DatagramChannel(eventLoop: eventLoop, descriptor: descriptor)
}
return bind0(makeChannel: makeChannel) { (eventLoop, channel) in
let promise: EventLoopPromise<Void> = eventLoop.newPromise()
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}
}

/// Bind the `DatagramChannel` to `host` and `port`.
///
/// - parameters:
/// - host: The host to bind on.
/// - port: The port to bind on.
public func bind(host: String, port: Int) -> EventLoopFuture<Channel> {
let evGroup = group
do {
let address = try SocketAddress.newAddressResolving(host: host, port: port)
return bind0(eventLoopGroup: evGroup, to: address)
} catch let err {
return evGroup.next().newFailedFuture(error: err)
return bind0 {
return try SocketAddress.newAddressResolving(host: host, port: port)
}
}

Expand All @@ -490,47 +550,60 @@ public final class DatagramBootstrap {
/// - parameters:
/// - address: The `SocketAddress` to bind on.
public func bind(to address: SocketAddress) -> EventLoopFuture<Channel> {
return bind0(eventLoopGroup: group, to: address)
return bind0 { address }
}

/// Bind the `DatagramChannel` to a UNIX Domain Socket.
///
/// - parameters:
/// - unixDomainSocketPath: The path of the UNIX Domain Socket to bind on. `path` must not exist, it will be created by the system.
public func bind(unixDomainSocketPath: String) -> EventLoopFuture<Channel> {
let evGroup = group
return bind0 {
return try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
}
}

private func bind0(_ makeSocketAddress: () throws -> SocketAddress) -> EventLoopFuture<Channel> {
let address: SocketAddress
do {
let address = try SocketAddress(unixDomainSocketPath: unixDomainSocketPath)
return bind0(eventLoopGroup: evGroup, to: address)
} catch let err {
return evGroup.next().newFailedFuture(error: err)
address = try makeSocketAddress()
} catch {
return group.next().newFailedFuture(error: error)
}
func makeChannel(_ eventLoop: SelectableEventLoop) throws -> DatagramChannel {
return try DatagramChannel(eventLoop: eventLoop,
protocolFamily: address.protocolFamily)
}
return bind0(makeChannel: makeChannel) { (eventLoop, channel) in
channel.register().then {
_ in return channel.bind(to: address)
}
}
}

private func bind0(eventLoopGroup: EventLoopGroup, to address: SocketAddress) -> EventLoopFuture<Channel> {
let eventLoop = eventLoopGroup.next()
private func bind0(makeChannel: (_ eventLoop: SelectableEventLoop) throws -> DatagramChannel, _ registerAndBind: @escaping (EventLoop, DatagramChannel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
let eventLoop = self.group.next()
let channelInitializer = self.channelInitializer ?? { _ in eventLoop.newSucceededFuture(result: ()) }
let channelOptions = self.channelOptions

let promise: EventLoopPromise<Channel> = eventLoop.newPromise()
let future: EventLoopFuture<Channel>
do {
let channel = try DatagramChannel(eventLoop: eventLoop as! SelectableEventLoop,
protocolFamily: address.protocolFamily)
let channel = try makeChannel(eventLoop as! SelectableEventLoop)

channelInitializer(channel).then {
future = channelInitializer(channel).then {
channelOptions.applyAll(channel: channel)
}.then {
channel.register()
}.then {
channel.bind(to: address)
registerAndBind(eventLoop, channel)
}.map {
channel
}.cascade(promise: promise)
} catch let err {
promise.fail(error: err)
}
} catch {
let promise: EventLoopPromise<Channel> = eventLoop.newPromise()
promise.fail(error: error)
future = promise.futureResult
}

return promise.futureResult
return future
}
}

Expand Down
9 changes: 9 additions & 0 deletions Sources/NIO/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public protocol ChannelCore: class {
/// - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
func register0(promise: EventLoopPromise<Void>?)

/// Register channel as already connected or bound socket.
/// - parameters:
/// - promise: The `EventLoopPromise` which should be notified once the operation completes, or nil if no notification should take place.
func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?)

/// Bind to a `SocketAddress`.
///
/// - parameters:
Expand Down Expand Up @@ -205,6 +210,10 @@ extension Channel {
pipeline.register(promise: promise)
}

public func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
promise?.fail(error: ChannelError.operationUnsupported)
}

public func triggerUserOutboundEvent(_ event: Any, promise: EventLoopPromise<Void>?) {
pipeline.triggerUserOutboundEvent(event, promise: promise)
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIO/DeadChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ private final class DeadChannelCore: ChannelCore {
promise?.fail(error: ChannelError.ioOnClosedChannel)
}

func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
promise?.fail(error: ChannelError.ioOnClosedChannel)
}

func bind0(to: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.fail(error: ChannelError.ioOnClosedChannel)
}
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIO/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ class EmbeddedChannelCore: ChannelCore {
pipeline.fireChannelRegistered0()
}

func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
isActive = true
register0(promise: promise)
pipeline.fireChannelActive0()
}

func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
guard let data = data.tryAsIOData() else {
promise?.fail(error: ChannelError.writeDataUnsupported)
Expand Down
13 changes: 13 additions & 0 deletions Sources/NIO/ServerSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@
super.init(descriptor: sock)
}

/// Create a new instance.
///
/// - parameters:
/// - descriptor: The _Unix file descriptor_ representing the bound socket.
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if socket is invalid.
init(descriptor: CInt, setNonBlocking: Bool = false) throws {
super.init(descriptor: descriptor)
if setNonBlocking {
try self.setNonBlocking()
}
}

/// Start to listen for new connections.
///
/// - parameters:
Expand Down
15 changes: 14 additions & 1 deletion Sources/NIO/Socket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,27 @@ public typealias IOVector = iovec
super.init(descriptor: sock)
}

/// Create a new instance out of an already established socket.
///
/// - parameters:
/// - descriptor: The existing socket descriptor.
/// - setNonBlocking: Set non-blocking mode on the socket.
/// - throws: An `IOError` if could not change the socket into non-blocking
init(descriptor: CInt, setNonBlocking: Bool) throws {
super.init(descriptor: descriptor)
if setNonBlocking {
try self.setNonBlocking()
}
}

/// Create a new instance.
///
/// The ownership of the passed in descriptor is transferred to this class. A user must call `close` to close the underlying
/// file descriptor once its not needed / used anymore.
///
/// - parameters:
/// - descriptor: The file descriptor to wrap.
override init(descriptor: Int32) {
override init(descriptor: CInt) {
super.init(descriptor: descriptor)
}

Expand Down
Loading

0 comments on commit 338cfb5

Please sign in to comment.