Skip to content

Commit

Permalink
Merge pull request #34 from Zewo/fd
Browse files Browse the repository at this point in the history
Remove handle and add write and read to fd
  • Loading branch information
paulofaria authored May 20, 2017
2 parents e81c3f7 + 9f8a358 commit 8aedb4f
Show file tree
Hide file tree
Showing 53 changed files with 1,558 additions and 3,088 deletions.
4 changes: 0 additions & 4 deletions .jazzy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ xcodebuild_arguments:
- Venice

custom_categories:
- name: Handles
children:
- Handle

- name: Coroutines
children:
- Coroutine
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import PackageDescription

let package = Package(
dependencies: [
.Package(url: "https://github.com/Zewo/Venice.git", majorVersion: 0, minor: 17)
.Package(url: "https://github.com/Zewo/Venice.git", majorVersion: 0, minor: 18)
]
)
```
Expand All @@ -54,7 +54,7 @@ What you end up with is a tree of coroutines rooted in the `main` function. This

![call-tree](http://libdill.org/index3.jpeg "Call Tree")

Venice implements structured concurrency by allowing you to close a running coroutine.
Venice implements structured concurrency by allowing you to cancel a running coroutine.

```swift
let coroutine = try Coroutine {
Expand All @@ -71,12 +71,12 @@ let coroutine = try Coroutine {
}

try Coroutine.wakeUp(1.second.fromNow())
try coroutine.close()
coroutine.cancel()
```

When a coroutine is being closed all blocking calls will start to throw `VeniceError.canceled`. On one hand, this forces the function to finish quickly (there's not much you can do without blocking functions); on the other hand, it provides an opportunity for cleanup.
When a coroutine is being canceled all coroutine-blocking calls will start to throw `VeniceError.canceledCoroutine`. On one hand, this forces the function to finish quickly (there's not much you can do without coroutine-blocking functions); on the other hand, it provides an opportunity for cleanup.

In the example above, when `coroutine.close` is called the call to `Coroutine.wakeUp` inside the coroutine will throw `VeniceError.canceled` and then the `defer` statement will run, thus releasing the memory allocated for `resource`.
In the example above, when `coroutine.cancel` is called the call to `Coroutine.wakeUp` inside the coroutine will throw `VeniceError.canceledCoroutine` and then the `defer` statement will run, thus releasing the memory allocated for `resource`.

# Threads

Expand Down
156 changes: 105 additions & 51 deletions Sources/Venice/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,6 @@ import CLibdill

/// A channel is a synchronization primitive.
///
/// # Threads
///
/// You can use Venice in multi-threaded programs.
/// However, individual threads are strictly separated.
/// You may think of each thread as a separate process.
///
/// In particular, a coroutine created in a thread will
/// be executed in that same thread, and it will never
/// migrate to a different one.
///
/// In a similar manner, a handle, such as a channel or
/// a coroutine handle, created in one thread cannot be
/// used in a different thread.
///
/// ## Example:
///
/// ```swift
Expand All @@ -32,9 +18,10 @@ import CLibdill
/// }
///
/// let theAnswer = try channel.receive(deadline: 1.second.fromNow())
/// try coroutine.close()
/// ```
public final class Channel<Type> : Handle {
public final class Channel<Type> {
private typealias Handle = Int32

private enum ChannelResult<Type> {
case value(Type)
case error(Error)
Expand All @@ -49,6 +36,7 @@ public final class Channel<Type> : Handle {
}
}

private let handle: Handle
private var buffer = List<ChannelResult<Type>>()

/// Creates a channel
Expand All @@ -58,39 +46,36 @@ public final class Channel<Type> : Handle {
/// It doesn't store any items.
///
/// - Throws: The following errors might be thrown:
/// #### VeniceError.canceled
/// Thrown when the operation is performed within a closed coroutine.
/// #### VeniceError.canceledCoroutine
/// Thrown when the operation is performed within a canceled coroutine.
/// #### VeniceError.outOfMemory
/// Thrown when the system doesn't have enough memory to perform the operation.
/// #### VeniceError.unexpectedError
/// Thrown when an unexpected error occurs.
/// This should never happen in the regular flow of an application.
/// Thrown when the system doesn't have enough memory to create a new channel.
public init() throws {
let result = chmake(0)

guard result != -1 else {
switch errno {
case ECANCELED:
throw VeniceError.canceled
throw VeniceError.canceledCoroutine
case ENOMEM:
throw VeniceError.outOfMemory
default:
throw VeniceError.unexpectedError
}
}

super.init(handle: result)
handle = result
}

deinit {
try? close()
hclose(handle)
}

/// Reference to the channel which can only send.
public lazy var sendOnly: SendOnly = SendOnly(self)
public lazy var sending: Sending = Sending(self)

/// Reference to the channel which can only receive.
public lazy var receiveOnly: ReceiveOnly = ReceiveOnly(self)
public lazy var receiving: Receiving = Receiving(self)

/// Sends a value to the channel.
public func send(_ value: Type, deadline: Deadline) throws {
Expand All @@ -108,12 +93,10 @@ public final class Channel<Type> : Handle {

guard result == 0 else {
switch errno {
case EBADF:
throw VeniceError.invalidHandle
case ECANCELED:
throw VeniceError.canceled
throw VeniceError.canceledCoroutine
case EPIPE:
throw VeniceError.handleIsDone
throw VeniceError.doneChannel
case ETIMEDOUT:
buffer.remove(node)
throw VeniceError.deadlineReached
Expand All @@ -129,12 +112,10 @@ public final class Channel<Type> : Handle {

guard result == 0 else {
switch errno {
case EBADF:
throw VeniceError.invalidHandle
case ECANCELED:
throw VeniceError.canceled
throw VeniceError.canceledCoroutine
case EPIPE:
throw VeniceError.handleIsDone
throw VeniceError.doneChannel
case ETIMEDOUT:
throw VeniceError.deadlineReached
default:
Expand All @@ -145,36 +126,50 @@ public final class Channel<Type> : Handle {
return try buffer.removeFirst().getValue()
}

/// This function is used to inform the channel that no more `send` or `receive` should be
/// performed on the channel.
///
/// - Warning:
/// After `done` is called on a channel, any attempts to `send` or `receive`
/// will result in a `VeniceError.doneChannel` error.
public func done() {
hdone(handle, 0)
}

/// Send-only reference to an existing channel.
///
/// ## Example:
///
/// ```swift
/// let channel = Channel<Int>()
///
/// func send(to channel: Channel<Int>.SendOnly) throws {
/// func send(to channel: Channel<Int>.Sending) throws {
/// try channel.send(42, deadline: 1.second.fromNow())
/// }
///
/// try send(to: channel.sendOnly)
/// try send(to: channel.sending)
/// ```
public final class SendOnly : Handle {
public final class Sending {
private let channel: Channel<Type>

fileprivate init(_ channel: Channel<Type>) {
self.channel = channel
super.init(handle: channel.handle)
}

/// Sends a value to the channel.
/// :nodoc:
public func send(_ value: Type, deadline: Deadline) throws {
try channel.send(value, deadline: deadline)
}

/// Sends an error to the channel.
/// :nodoc:
public func send(_ error: Error, deadline: Deadline) throws {
try channel.send(error, deadline: deadline)
}

/// :nodoc:
public func done() {
channel.done()
}
}

/// Receive-only reference to an existing channel.
Expand All @@ -184,41 +179,100 @@ public final class Channel<Type> : Handle {
/// ```swift
/// let channel = Channel<Int>()
///
/// func receive(from channel: Channel<Int>.ReceiveOnly) throws {
/// func receive(from channel: Channel<Int>.Receiving) throws {
/// let value = try channel.receive(deadline: 1.second.fromNow())
/// }
///
/// try receive(from: channel.receiveOnly)
/// try receive(from: channel.receiving)
/// ```
public final class ReceiveOnly : Handle {
public final class Receiving {
private let channel: Channel<Type>

fileprivate init(_ channel: Channel<Type>) {
self.channel = channel
super.init(handle: channel.handle)
}

/// Receives a value from channel.
/// :nodoc:
@discardableResult public func receive(deadline: Deadline) throws -> Type {
return try channel.receive(deadline: deadline)
}

/// :nodoc:
public func done() {
channel.done()
}
}
}

extension Channel where Type == Void {
/// Sends to the channel.
///
/// :nodoc:
public func send(deadline: Deadline) throws {
try send((), deadline: deadline)
}
}

extension Channel.SendOnly where Type == Void {
/// Sends to the channel.
///
extension Channel.Sending where Type == Void {
/// :nodoc:
public func send(deadline: Deadline) throws {
try send((), deadline: deadline)
}
}

class Node<T> {
var value: T
var next: Node<T>?
weak var previous: Node<T>?

init(value: T) {
self.value = value
}
}

fileprivate class List<T> {
private var head: Node<T>?
private var tail: Node<T>?

@discardableResult fileprivate func append(_ value: T) -> Node<T> {
let newNode = Node(value: value)

if let tailNode = tail {
newNode.previous = tailNode
tailNode.next = newNode
} else {
head = newNode
}

tail = newNode
return newNode
}

@discardableResult fileprivate func remove(_ node: Node<T>) -> T {
let prev = node.previous
let next = node.next

if let prev = prev {
prev.next = next
} else {
head = next
}

next?.previous = prev

if next == nil {
tail = prev
}

node.previous = nil
node.next = nil

return node.value
}

@discardableResult fileprivate func removeFirst() throws -> T {
guard let head = head else {
throw VeniceError.unexpectedError
}

return remove(head)
}
}
Loading

0 comments on commit 8aedb4f

Please sign in to comment.