Skip to content

Commit

Permalink
Merge pull request #139 from d-exclaimation/async-throwing-pubsub
Browse files Browse the repository at this point in the history
Async throwing pubsub
  • Loading branch information
d-exclaimation authored Oct 22, 2023
2 parents 06eddfa + 6ec9148 commit be1925c
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 57 deletions.
55 changes: 32 additions & 23 deletions Documentation/pages/docs/fundamentals/subscriptions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -334,25 +334,25 @@ struct RedisPubSub: PubSub {
}

/// Get a downstream from the broadcast for the channel given
func downstream(to channel: String) async -> AsyncStream<Data> {
let broadcast = await subscribe(to: channel)
func downstream(to channel: String) async throws -> AsyncStream<Data> {
let broadcast = try await subscribe(to: channel)
let downstream = await broadcast.downstream()
return downstream.stream
}

/// Get the broadcast for the channel if exist, otherwise make a new one
private func subscribe(to channel: String) async -> Broadcast<Data> {
private func subscribe(to channel: String) async throws -> Broadcast<Data> {
if let broadcast = broadcasting[channel] {
return broadcast
}
let broadcast = Broadcast<Data>()
broadcasting[channel] = broadcast
await apply(from: .init(channel), to: broadcast)
try await apply(from: .init(channel), to: broadcast)
return broadcast
}

/// Apply broadcasting to the Redis channel subscription
private func apply(from channel: RedisChannelName, to broadcast: Broadcast<Data>) async {
private func apply(from channel: RedisChannelName, to broadcast: Broadcast<Data>) async throws {
do {
try await redis.subscribe(
to: channel,
Expand All @@ -372,46 +372,55 @@ struct RedisPubSub: PubSub {
.get()
} catch {
await broadcast.close()
throw error
}
}

/// Pubblish the data (which is RESPValueConvertible) to the specific redis channel
func publish(for channel: String, _ value: Data) async {
let _ = try? await redis.publish(value, to: .init(channel)).get()
func publish(for channel: String, _ value: Data) async throws {
let _ = try await redis.publish(value, to: .init(channel)).get()
}

/// Close the redis channel subscription and all of the downstreams
func close(for channel: String) async {
try? await redis.unsubscribe(from: .init(channel)).get()
func close(for channel: String) async throws {
try await redis.unsubscribe(from: .init(channel)).get()
await broadcasting[channel]?.close()
}
}

// MARK: -- Protocol required methods

public func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type = DataType.self, for trigger: String) -> AsyncStream<DataType> {
AsyncStream<DataType> { con in
public func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type = DataType.self, for trigger: String) -> AsyncThrowingStream<DataType, Error> {
AsyncThrowingStream<DataType, Error> { con in
let task = Task {
let stream = await dispatcher.downstream(to: trigger)
for await data in stream {
guard let event = try? JSONDecoder().decode(DataType.self, data) else { continue }
con.yield(event)
do {
let stream = try await dispatcher.downstream(to: trigger)
for await data in stream {
do {
let event = try JSONDecoder().decode(DataType.self, data)
con.yield(event)
} catch {
con.finish(throwing: error)
}
}
con.finish()
} catch {
con.finish(throwing: error)
}
con.finish()
}
con.onTermination = { @Sendable _ in
task.cancel()
}
}
}

public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async {
guard let data = try? JSONEncoder().encode(payload) else { return }
await dispatcher.publish(for: trigger, data)
public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async throws {
let data = try JSONEncoder().encode(payload)
try await dispatcher.publish(for: trigger, data)
}

public func close(for trigger: String) async {
await dispatcher.close(for: trigger)
public func close(for trigger: String) async throws {
try await dispatcher.close(for: trigger)
}

// MARK: - Properties
Expand All @@ -434,9 +443,9 @@ struct Message: Sendable, Codable { ... }
struct Resolver {
let pubsub: PubSub = app.environment.isRelease ? RedisPubSub(app.redis) : AsyncPubSub()

func create(ctx: Context, _: NoArguments) async -> Message {
func create(ctx: Context, _: NoArguments) async throws -> Message {
let message = ...
await pubsub.publish(message)
try await pubsub.publish(message)
return message
}

Expand Down
2 changes: 1 addition & 1 deletion Documentation/pages/docs/getting-started.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ struct Resolver {
guard let book else {
throw Abort(.internalServerError)
}
await pubsub.publish(for: trigger, payload: book)
try await pubsub.publish(for: trigger, payload: book)
return book
}

Expand Down
16 changes: 8 additions & 8 deletions Sources/Pioneer/GraphQL/Extensions/Field+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public extension Graphiti.Field where FieldType: Encodable {
) {
let resolve: AsyncResolve<ObjectType, Context, Arguments, FieldType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -39,7 +39,7 @@ public extension Graphiti.Field where FieldType: Encodable {
) where Arguments == NoArguments {
let resolve: AsyncResolve<ObjectType, Context, Arguments, FieldType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -56,7 +56,7 @@ public extension Graphiti.Field where FieldType: Encodable {
) {
let resolve: AsyncResolve<ObjectType, Context, Arguments, FieldType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
try await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -70,7 +70,7 @@ public extension Graphiti.Field where FieldType: Encodable {
) where Arguments == NoArguments {
let resolve: AsyncResolve<ObjectType, Context, Arguments, FieldType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
try await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -90,7 +90,7 @@ public extension Graphiti.Field {
) {
let resolve: AsyncResolve<ObjectType, Context, Arguments, ResolveType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -105,7 +105,7 @@ public extension Graphiti.Field {
) where Arguments == NoArguments {
let resolve: AsyncResolve<ObjectType, Context, Arguments, ResolveType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -123,7 +123,7 @@ public extension Graphiti.Field {
) {
let resolve: AsyncResolve<ObjectType, Context, Arguments, ResolveType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
try await function(type)(context, arguments, eventLoopGroup)
}
}
Expand All @@ -138,7 +138,7 @@ public extension Graphiti.Field {
) where Arguments == NoArguments {
let resolve: AsyncResolve<ObjectType, Context, Arguments, ResolveType> = { type in
{ context, arguments, eventLoopGroup in
eventLoopGroup.performWithTask {
eventLoopGroup.makeFutureWithTask {
try await function(type)(context, arguments, eventLoopGroup)
}
}
Expand Down
15 changes: 9 additions & 6 deletions Sources/Pioneer/Streaming/AsyncPubSub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public struct AsyncPubSub: PubSub, Sendable {
/// Async stream return a new AsyncStream that is connected to the emitter that is assigned to the given key
/// - Parameter key: The string topic / key used to find the emitter
/// - Returns: An async stream that is linked to an emitter
internal func asyncStream(for key: String) async -> AsyncStream<Sendable> {
internal func asyncStream(for key: String) async -> AsyncThrowingStream<Sendable, Error> {
let emitter = await subscribe(for: key)
let downstream = await emitter.downstream()
return downstream.stream
Expand Down Expand Up @@ -58,12 +58,15 @@ public struct AsyncPubSub: PubSub, Sendable {
/// - Parameters:
/// - type: DataType of this AsyncStream
/// - trigger: The topic string used to differentiate what data should this stream be accepting
public func asyncStream<DataType: Sendable & Decodable>(_: DataType.Type = DataType.self, for trigger: String) -> AsyncStream<DataType> {
AsyncStream<DataType> { con in
public func asyncStream<DataType: Sendable & Decodable>(_: DataType.Type = DataType.self, for trigger: String) -> AsyncThrowingStream<DataType, Error> {
AsyncThrowingStream<DataType, Error> { con in
let task = Task {
let pipe = await dispatcher.asyncStream(for: trigger)
for await untyped in pipe {
guard let typed = untyped as? DataType else { continue }
for try await untyped in pipe {
guard let typed = untyped as? DataType else {
con.finish(throwing: PubSubConversionError(reason: "Failed to convert \(untyped) to \(DataType.self)"))
return
}
con.yield(typed)
}
con.finish()
Expand All @@ -78,7 +81,7 @@ public struct AsyncPubSub: PubSub, Sendable {
/// - Parameters:
/// - trigger: The trigger this data will be published to
/// - payload: The data being emitted
public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async {
public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async {
await dispatcher.publish(for: trigger, payload)
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Pioneer/Streaming/Broadcast.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import struct Foundation.UUID

/// An actor to broadcast messages to multiple downstream from a single upstream
public actor Broadcast<MessageType: Sendable> {
public typealias Consumer = AsyncStream<MessageType>.Continuation
public typealias Consumer = AsyncThrowingStream<MessageType, Error>.Continuation

private var consumers: [UUID: Consumer] = [:]

Expand Down Expand Up @@ -84,21 +84,21 @@ public struct Downstream<Element: Sendable>: AsyncSequence {
/// The id of the stream
public let id: UUID
/// The stream itself
public let stream: AsyncStream<Element>
public let stream: AsyncThrowingStream<Element, Error>

public init(
_ elementType: Element.Type = Element.self,
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
_ build: @escaping (UUID, AsyncStream<Element>.Continuation) -> Void
bufferingPolicy limit: AsyncThrowingStream<Element, Error>.Continuation.BufferingPolicy = .unbounded,
_ build: @escaping (UUID, AsyncThrowingStream<Element, Error>.Continuation) -> Void
) {
let id = UUID()
self.id = id
self.stream = AsyncStream<Element>(elementType, bufferingPolicy: limit) { con in
self.stream = AsyncThrowingStream<Element, Error>(elementType, bufferingPolicy: limit) { con in
build(id, con)
}
}

public func makeAsyncIterator() -> AsyncStream<Element>.AsyncIterator {
public func makeAsyncIterator() -> AsyncThrowingStream<Element, Error>.AsyncIterator {
stream.makeAsyncIterator()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by d-exclaimation on 3:38 PM.
//

import NIOCore
import class GraphQL.ConcurrentEventStream
import class GraphQL.Future
import struct GraphQL.GraphQLResult
Expand Down
21 changes: 18 additions & 3 deletions Sources/Pioneer/Streaming/PubSub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,30 @@ public protocol PubSub {
/// - Parameters:
/// - type: DataType of this AsyncStream
/// - trigger: The topic string used to differentiate what data should this stream be accepting
func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type, for trigger: String) -> AsyncStream<DataType>
func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type, for trigger: String) -> AsyncThrowingStream<DataType, Error>

/// Publish a new data into the pubsub for a specific trigger.
/// - Parameters:
/// - trigger: The trigger this data will be published to
/// - payload: The data being emitted
func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async
func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async throws

/// Close a specific trigger and deallocate every consumer of that trigger
/// - Parameter trigger: The trigger this call takes effect on
func close(for trigger: String) async
func close(for trigger: String) async throws
}


/// Type Conversion Error for the PubSub
public struct PubSubConversionError: Error, @unchecked Sendable {
/// The detailed reasoning why conversion failed
public var reason: String

public init(reason: String) {
self.reason = reason
}

public var localizedDescription: String {
"PubSubConversionError: \"\(reason)\""
}
}
Loading

0 comments on commit be1925c

Please sign in to comment.