Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub and Broadcast #45

Merged
merged 17 commits into from
Jun 22, 2022
Merged

PubSub and Broadcast #45

merged 17 commits into from
Jun 22, 2022

Conversation

d-exclaimation
Copy link
Owner

@d-exclaimation d-exclaimation commented Jun 20, 2022

Motivation

PubSub

By exporting the protocol, PubSub, it allow different implementation of AsyncPubSub notably implementation backed by popular event-publishing systems (i.e. Redis) with similar API which allow user of this library to prototype with the in memory AsyncPubSub and easily migrate to a distributed PubSub implementation without very little changes.

Broadcast

Additionally, common client libraries for popular event-publishing systems usually only provide a function that to subscribe to a specific publisher, but no option of unsubscribing without closing the publisher entirely and sometimes only allow 1 subscriber for each publisher (usually because subscription is its own new network connection).

1st proposal In this case, the protocol, `Broadcast`, is exported which allow a `PubSub` to broadcast the subscription values to multiple different downstream where each downstream share the same upstream and can be unsubscribed / disposed (to prevent leaks) without closing the upstream and publisher. Any `PubSub` can be optionally conform to this protocol which will need to implementation a different sets of functions, but will be provided with the required functions to conforms to `PubSub`.

An example of this is AsyncPubSub itself, which store 1 source upstream for each topic but for each subscription, it will create a new downstream and broadcast the messages from the upstream to all of the downstream. This way, there can multiple subscriber for each upstream (topic) and each subscriber downstream can be disposed without closing the source stream (which will close all subscriber for that topic).

2nd proposal

In this case, the observer actor, Broadcast, is provided which can broadcast any subscription values to multiple different downstream where each downstream share the same upstream and can be unsubscribed / disposed (to prevent leaks) without closing the upstream and publisher.

For instance

let broadcast = Broadcast<Message>()
try await app.redis.subscribe(
    to: "some-channel",
    messageReceiver: { _, value in 
       Task  {
            let message = convertMessage(value)
            await broadcast.publish(message)
        }
    },
    onUnsubscribe: { _, _ in 
        Task {
            await broadcast.close()
        }
    }
).get()

// All of these downstream are getting all messages from the upstream 
let downstream0 = await broadcast.downstream().stream
let downstream1 = await broadcast.downstream().stream
let downstream2 = await broadcast.downstream().stream
let downstream3 = await broadcast.downstream().stream

app.redis.publish(..., to: "some-channel")

// Dispose a downstream without effecting the others
let task3 = Task {
    for await msg in downstream3 {
        // ...
    }
}

task3.cancel()


// Shutdown all downstreams
try await app.redis.unsubscribe(from:  "some-channel").get()
await broadcast.close()

Optional

Both of these are optional, you can still implement a Pub/Sub without utilizing both of them. As long as that Pub/Sub can return an AsyncSequence, it will work with Pioneer.

Tasks

  • Added a protocol, PubSub, that define a method specification for AsyncPubSub
  • Added a protocol, Broadcast that define a specification for PubSub that require broadcasting capabilities with actor
  • Added a protocol that define a actor specification for Broadcast
  • Updated Broadcast to be a streaming middleware actor that handle distribution of downstream(s)
  • Updated EventStream part of documentation
  • Updated Reference part of documentation to include PubSub and Broadcast
  • Added PubSub section in either EventStream or Features to give clear explanation for making custom PubSub implementation
  • Test code on macOS
  • Test code on Linux

@d-exclaimation d-exclaimation added the enhancement New feature or request label Jun 20, 2022
@d-exclaimation d-exclaimation self-assigned this Jun 20, 2022
@d-exclaimation d-exclaimation linked an issue Jun 20, 2022 that may be closed by this pull request
@d-exclaimation
Copy link
Owner Author

d-exclaimation commented Jun 20, 2022

A bit busy for now, so I'll continue working on this later the week

@d-exclaimation d-exclaimation changed the title PubSub, BroadcastHub, Broadcast PubSub and Broadcast Jun 20, 2022
@cshadek
Copy link

cshadek commented Jun 20, 2022

This approach makes a lot of sense to me. It should be relatively simple to then create custom implementations of PubSub. I like the idea of including an example in the documentation. I also agree that making a separate package for these implementations makes sense. Nice work!

@d-exclaimation d-exclaimation marked this pull request as draft June 21, 2022 08:14
@d-exclaimation
Copy link
Owner Author

I made some changes to simply things a bit.

I changed Broadcast from a protocol conformed by PubSub to an actor that act as broadcaster to multiple consumer.

                                                              +------------+
                                                         +--> | Downstream |
                                                         |    +------------+
                                                         |                  
                                                         |    +------------+
+-----------+                        +-----------+       +--> | Downstream |
| Upstream  | ------ Message ------> | Broadcast |-------+    +------------+
+-----------+                        +-----------+       |                  
                                                         |    +------------+
                                                         +--> | Downstream |
                                                         |    +------------+
                                                         |                  
                                                         |    +------------+
                                                         +--  | Downstream |
                                                              +------------+

Basically, it applied on any upstream / subscription values to create downstream(s) and distribute messages into all downstream(s), where:

  • A single stream can be duplicated into multiple ones (usually to prevent making multiple network connections)
  • Different consumer can subscribe to the same upstream and all of them get the same messages
  • Downstream(s) can be disposed, stopped, or cancelled individually to prevent leaks
  • Closing downstream does not close other downstream(s), broadcast, and upstream
  • Closing broadcast dispose all downstream(s), but not necessarily the upstream
  • If implemented properly, closing upstream close the broadcast and dispose all downstream

I think I should stop here to not add any more bloat and increase the surface area where bug can occur in the library.

@d-exclaimation
Copy link
Owner Author

The constraint on what type can an AsyncStream be when calling asyncStream is a bit limiting, I was thinking of adding Codable as another constraint which would allow for better decoding and encoding

@d-exclaimation d-exclaimation marked this pull request as ready for review June 22, 2022 05:34
@d-exclaimation d-exclaimation merged commit 7d2fa36 into main Jun 22, 2022
@d-exclaimation d-exclaimation deleted the pubsub-protocol branch June 22, 2022 06:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Redis backed AsyncPubSub
2 participants