Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request for More Examples for Transitioning from RxJS to Effect in docs #373

Open
dilame opened this issue Nov 12, 2023 · 3 comments
Open

Comments

@dilame
Copy link

dilame commented Nov 12, 2023

Hi guys! I am a power-user of RxJS, i know it very well including all it's weaknesses. I've been watching Effect for a long time, and, i assume, it could look like RxJS without it's weaknesses.

I am currently exploring the possibility of transitioning from RxJS to Effect for my projects. While I find Effect intriguing and promising, I am facing some challenges in finding direct equivalents for certain RxJS features and patterns. I believe that additional examples and documentation in this area could greatly benefit users like myself who are considering this transition.

Here are my specific questions and requests:

ReplaySubject alternative in Effect

In RxJS, ReplaySubject is a widely used feature that allows new subscribers to receive emitted values even if they subscribe after these values have been emitted. Is there a direct alternative or a recommended pattern in Effect that achieves similar functionality?

Alternatives for switchMap/concatMap/exhaustMap/mergeMap

These RxJS operators are essential for dealing with higher-order Observables and managing the subscription lifecycle in a controlled manner. How can these patterns be implemented in Effect, particularly in scenarios involving complex stream transformations?

Hot/Cold Observables and Multicasting in Effect

In RxJS, the concepts of Hot and Cold Observables are crucial for understanding how observables behave and how they can be shared among multiple subscribers. Cold Observables start emitting values when a subscription is made, whereas Hot Observables share a single execution path and emit values irrespective of individual subscriptions. Could you elaborate on how these concepts translate into Effect, especially regarding multicasting and sharing streams among multiple consumers? Any examples or patterns that demonstrate the equivalent of Hot/Cold Observables and multicast strategies in Effect would be greatly appreciated.

Writing SDKs for WebSocket APIs with Effect

I am particularly interested in writing SDKs for WebSocket APIs using Effect-TS. Could you provide guidance or examples on how this can be achieved using Stream in Effect (or maybe something else)? I am having difficulties with this because WebSocket is push-based by it's nature, but Effect Stream is pull-based.

Your insights and examples would be invaluable for developers like me who are keen on leveraging the power of functional programming in TypeScript with Effect but need some guidance on how to replicate or adapt certain RxJS patterns in this new environment.

I would be glad to read any answers, whether in this thread or on the website. I think a lot of current RxJS users are potential Effect users.

Thank you for your time and consideration.

@TylorS
Copy link

TylorS commented Nov 13, 2023

Hey @dilame, I don't pretend to be an expert on Effect's Streams, but I've been working with push-based reactive streams for nearly a decade, and I am curious as to what you see that is similar? I am genuinely curious of your answer, it isn't obvious to me.

In my assessment though, Effect simply has no push-based semantics by itself and they are thus not truly comparable, but I will try to answer your specific questions in the meantime regardless.

ReplaySubject alternative in Effect

Effect's closest equivalent is SubscriptionRef which is pretty much the same thing in that there is always a current state, it can be sampled, and you can subscribe to its changes over time.

Alternatives for switchMap/concatMap/exhaustMap/mergeMap

Again, no push semantics natively, but Effect's Stream doesn't have any exhaustMap equivalent as far as I'm aware, but it does have similar versions of the others

Rx.mergeMap => Stream.flatMap
Rx.concatMap => Stream.flatMap(stream, { concurrency: 1 })
Rx.switchMap => Stream.flatMap(stream, { concurrency: 1, switch: true })

Hot/Cold Observables

Hot and cold have nothing to do with the abstractions you're using to consume events, they exist outside of Rx, Effect, or anything else. If the act of subscription creates a source of events it is cold, and if the act of subscription just "tunes in" it is hot. In a browser for example, natively DOM events are already hot, even if we don't use addEventListener, clicks still send events to be handled by the browser.

Since hot/cold doesn't really relate to any of these abstractions, I wouldn't say there's a need to translate them to Effect.

Multicasting in Effect

I split this out, because there are analogs in Stream worth mentioning

I haven't actually used these at all, so I'm not readily able to give any examples with them, but I do understand them to be the multicasting equivalents for Stream.

Writing SDKs for WebSocket APIs with Effect

If it is acceptable to change semantics, you will be mostly enqueuing the incoming messages from the websocket into a queue which the Stream will allow consumers to pull from.

I haven't worked with websockets in a while, so don't take this as the definitive way to do so, but

Stream.asyncEffect((emit) => Effect.gen(function*(_) {
   // This can be replaced with a function parameter or however you prefer, but we need a WebSocket somehow
   const socket = yield* _(acquireWebsocket)
   
   // Create  stable event listeners
   const messageListener = (event: Event) => emit.single(event.data)
   const closeListener = emit.end
   
   // Listen to messages
   socket.addEventListener('message', messageListener)
   socket.addEventListener('close', closeListener)
   
   // Cleanup event listener when the Scope closes
   yield* _(Effect.addFinalizer(() => Effect.sync(() => {
     socket.removeEventListener('message', messageListener)
     socket.removeEventListener('close', closeListener)
   }))
}))

Shameless plug

I'm building push-based semantics atop of Effect over here, it's mostly still a WIP, though totally functional, but if you're familiar with RxJS I'd be super keen to get your feedback!

The core abstraction is Fx for the push-based streams - https://tylors.github.io/typed/fx/Fx.ts.html, and it is definitely the most complete out of everything there.

ReplaySubject

Atop of my Fx abstraction, I have a RefSubject, https://tylors.github.io/typed/fx/RefSubject.ts.html, which is pretty much what you're looking for with a much deeper integration to Effect that Rx of course.

Higher-order streams

I have quite a few more, but to mention those you asked about

Multicasting
Websockets

It looks pretty much the same as the Stream implementation, but doesn't require the semantics shift downstream, calling all of the emitter/sink methods will invoke the full sink chain granted there isn't any combinators utilized which add asynchrony

Fx.fromEmitter((emitter) => Effect.gen(function*(_) {
   // This can be replaced with a function parameter or however you prefer, but we need a WebSocket somehow
   const socket = yield* _(acquireWebsocket)
   
   // Create  stable event listeners
   const messageListener = (event: Event) => emit.succeed(event.data)
   const closeListener = emit.end
   
   // Listen to messages
   socket.addEventListener('message', messageListener)
   socket.addEventListener('close', closeListener)
   
   // Cleanup event listener when the Scope closes
   yield* _(Effect.addFinalizer(() => Effect.sync(() => {
     socket.removeEventListener('message', messageListener)
     socket.removeEventListener('close', closeListener)
   }))
}))

@dilame
Copy link
Author

dilame commented Nov 26, 2023

Thank you very much, @TylorS for your detailed answers.

Regarding the similarities between RxJS and Effect, I don't see direct commonalities in the interfaces, but I do feel that Effect could offers a more robust way to solve the same problems that I currently handle with RxJS.

I have a follow-up question regarding the implementation of a WebSocket SDK using Effect. I'm trying to understand how to best structure the SDK to maintain a single WebSocket connection while handling different types of messages. Here's a basic interface I have in mind:

interface WsSDK {
  connection(): Stream<never, never, A | B | C | D>
}

The idea is that a single call to connection() should establish exactly one connection, through which the server can send different types of messages (A, B, C, D). Each message type would be processed by different functions in various parts of the program, but it's crucial that there's only one active connection. Could you provide guidance or an example on how this can be achieved using Effect? Specifically, I'm interested in understanding how to manage and share a single WebSocket connection across multiple consumers of these message types within the application.

@TylorS
Copy link

TylorS commented Nov 28, 2023

@dilame There are probably a number of ways to approach that, but I'd probably lean towards using a Layer and a reference.

import { Context, Effect, Layer, Option, Ref } from 'effect'

interface WsSDK {
  connection(): Stream<never, never, A | B | C | D>
}

const WsSDK = Context.Tag<WsSDK>()

const makeWsSDK = (params?) => Layer.scoped(WsSDK, Effect.gen(function*(_) {
  // optional, but Scope is helpful for managing resources you need/want interrupted later
  const scope = yield* _(Effect.scope)
  const connection = yield* _(Ref.make(Option.none<SomeConnectionType>()))
  const sdk: WsSDK = {
    connection() {
      return Ref.get(connection).pipe(
        // Somewhere in the None case you would use Ref.set() to save the connection for later usage
        Effect.flatMap(Option.match({ onNone: ..., onSome: ... })),
        // Make a stream using whatever data you need
        Effect.map(x => makeAStreamWithinAnEffect(x)),
        // unwraps an Effect of a Stream into just a Stream
        Stream.unwrap
      )
    }
  }
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants