Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle a hot source? #292

Closed
ronag opened this issue Apr 28, 2015 · 4 comments
Closed

How to handle a hot source? #292

ronag opened this issue Apr 28, 2015 · 4 comments

Comments

@ronag
Copy link

ronag commented Apr 28, 2015

I'm trying to convert the following RxJS code to HighlandJS but I'm unsure how to handle a "hot" source.

i.e. how would I do the following in highland?

onChanges(options) {
    return Rx.Observable
      .create(o => {
        const changes = this.changes(options)
          .on('change', (change) => o.onNext(change))
          .on('complete', (info) => o.onCompleted())
          .on('error', (err) => o.onError(err))
        return () => changes.cancel() // Need to call this
      })
  }

It is the "changes" feed in PouchDB that I'd like to highlandify: http://pouchdb.com/api.html#changes

@ronag ronag changed the title How to dispose source? How to handle a hot source? Apr 28, 2015
@ronag ronag closed this as completed Apr 28, 2015
@ronag ronag reopened this Apr 28, 2015
@vqvu
Copy link
Collaborator

vqvu commented Apr 28, 2015

Highland has no concept of a "hot" vs "cold" source. That's one of the main differences between Highland and Rx. Rx Observables can typically be subscribed to multiple times, while a highland stream can only be consume once. Also, all data in highland streams are buffered by default.

There's also no way to handle unsubscriptions automatically right now. That's something I've been meaning to tackle, but I haven't had the time. See #172 for relevant discussions.

The best you can do right now is

_(function (push, next) {
    const changes = this.changes(options)
    changes
        // Pass the object along so you can manually cancel.
        .on('change'), (change) => push(null, [changes, change]))
        .on('complete'), (info) => push(null, _.nil))
        .on('error', (err) => push(err))
});

@ronag
Copy link
Author

ronag commented Apr 28, 2015

Started an initial try to make a pause/resumable changes stream. pouchdb/pouchdb#3772

However, haven't found any good examples/documentation on how to create a pause/resumable highland stream.

@vqvu
Copy link
Collaborator

vqvu commented Apr 28, 2015

To create a pausable stream from a hot source, you need a backpressure strategy. The default strategy is to buffer. latest implements another. A third (that isn't implemented) is onBackpressureDrop (i.e., RxJs#pausable).

You should never need to implement pause and resume manually. You would typically use the generator API for this. The semantics is:

_(function cb(push, next) {
...
});
  1. cb will be called the first time a downstream consumer wants data. It may push as much data as it wants. Extra data will be buffered.
  2. cb will not be called again until both the following happens:
    1. next is called one time. It is an error to call next multiple times with no call to cb in between.
    2. The downstream consumer wants data and all buffered data has been exhausted.

Thus, a backpressure-aware generator will typically

  1. Call push one time once it has data.
  2. Immediately call next.
  3. Do not call push again until the next time cb is executed.
  4. Repeat until source ends, at which point it pushes _.nil.
    • It is an error to call push or next after pushing _.nil.
    • cb will never be called again after pushing _.nil.

Note that the push and next in the consume interface works the same way.

You can look at the code for latest to see how to change the backpressure strategy after the fact. Or you can implement the desired strategy directly in the generator (I suggest doing it after the fact in a separate transform for additional composability).

For example, with a onBackpressureDrop strategy, you would

  1. Keep a flag for when data is requested. Initial value is false.
  2. Subscribe to the source.
  3. When cb is called, set the flag to true.
  4. The next time the source emits a value, push it, set your flag to false, and call next.
    • Errors should probably be passed on regardless of whether or not data has been requested.
    • If the source ends, push _.nil.
  5. Repeat.

@ronag
Copy link
Author

ronag commented Apr 28, 2015

Thanks.

@ronag ronag closed this as completed Apr 28, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants