- Enhance any object into an event stream
- Consistent paradigm across callbacks, promises and observables
- Supports unique listener per namespace and wildcard event (
*
) - Tiny (~700b min+gzip no dependencies)
- Small and intuitive API:
.on
,.once
,.off
,.emit
, etc
const foo = emitterify()
foo.on('event', result => result == 'foo')
foo.once('event', result => result == 'foo')
foo.emit('event', 'foo')
If you omit the callback in .on
/.once
, you can await
the same result you normally get in the callback:
result = await foo.on('event')
result = await foo.once('event')
If you are interested in more than a single event, you can omit the callback and use .map
/.filter
/.reduce
:
foo
.on('event')
.map(result => ...)
.filter(result => ...)
.reduce(result => ...)
It can be really useful to mix approaches too, for example to manipulate a stream of events until a certain condition is fulfilled:
await cluster.on('changes').filter(d => peers.every(isStable))
// cluster now stabilised
await node.on('click').reduce(acc => ++acc).filter(d => d == 10)
// node clicked 10 times
When you do emitter.on(event)
it returns a channel through which events flow. The channel itself is emitterified so it can be used to communicate and respond to various events. There are few common ones such as start
, stop
for lazy subscriptions, but in contrast to other implementations, these are not hardcoded, which means you can also communicate other signals, such as the completion progress
of a channel, whether the original request for a stream of server responses has been sent
yet, etc. This affords much greater flexibility without any extra API, and the design becomes obvious when you realise that these are all qualities of the channel itself, and since they are events, they have the same reactive interface as other stream of events.
foo
.on('value')
.on('start', () => {}) // stream of meta-events, orthogonal to original channel ---->
.on('stop', () => {}) // stream of meta-events, orthogonal to original channel ---->
.on('progress', () => {}) // stream of meta-events, orthogonal to original channel ---->
// |
// ↓
// values flow down through the main channel
Observables (FRP) "push" values, whereas streams with backpressure or channels (CSP) will generally allow the consumer to process values according to it's own pace or "pull" values. This is important for certain types of operations where you don't want to overwhelm consumers. This is a natural fit here too, since channels implement the async iterator Symbol and so allow producers to respond to consumers that are already waiting and push new values only as they are requested. This is done via the stream of pull
events on the channel. Using the canonical example, let's say we create chan = o.on('foo')
and then pass this to the following threads:
async producer(chan, i = 0) => {
for await (const d of chan.on('pull'))
chan.next(++i)
}
async consumer (chan) => {
for await (const d of chan)
if (results.push(d) == 10) break
}
These two functions work co-operatively: the consumer is "pulling" values from the channel, whilst the producer is pushing them when requested.
The only observable operators included are .map
/.filter
/.reduce
, everything else can be implemented as a separate operator outside the core - similar to how utilise complements the native Array operators in contrast to lodash which attempts to replace them. Below is a few example operators. They are so small, that's it probably not even worth publishing these to npm.
-
sum = (acc = 0) => ++acc events.reduce(sum)
We just increment
acc
on every event and return it, which becomes the value ofacc
on the next event. -
Flatmap (26 chars) - see full test example
flatten = (d, i, n) => { d.map(n.next) } events.filter(flatten)
We don't return anything so the stream stops at this operator. However, we can use
n.next
to continue directly publishing events. Instead of continuing with the current value which we expect to be a stream, we subscribe to it instead to unwrap the values of the stream and then publish those. -
Switchmap (2 lines) - see full test example
See this description for an exaplanation of switchmap.
latest = (d, i, n) => { if (n.prev) n.prev.off(n.next) ;(n.prev = d).map(n.next) } events.filter(latest)
This is similar to the previous flatmap case, however when we receive a new stream, we stop emitting values from the previous stream. This is why we store a reference to the stream in
n.prev
so we can unsubscribe from it next time. -
See the ES6 Observable repo for an explanation of how you might want to use two infinite streams to create a single finite stream.
This operator is now built-in so you can do things like:
node .on('click') .map(..) .filter(...) .until(node.once('removed'))
-
Instead of abusing the
.filter
operator, if your custom operator needs to do things on startup, a better way to write them would be (all the following are equivalent):buffer(stream) stream |> buffer stream.pipe(buffer)
This essentially takes in a stream, and returns a new stream, controlling the flow between them (e.g. buffering) and proxying signals back upstream as necessary (e.g. unsubscription). This is the best way to write operators, especially if you also want to encapsulate the underlying input stream. The built-in operators (
map
/filter
/reduce
) are written in the same way so you could also writestream.pipe(map(...)).pipe(filter(...)).pipe(reduce(...))
if you prefer.
The events in the following libraries all use this interface:
- Remote Resources in browser with ripple:
ripple('resource').on('change').map(...)
- DOM Elements with once:
element.on('click').map(...)
- Distributed Services with fero:
await peer.on('connected')
,peer.on('change').map(...)
-
emitter =
emitterify
(something = {})
Enhances the specified object with the hidden (non-enumerable) methods
.on
,.once
,.emit
,.off
. -
emitter
.on
('event'[,
callback
])
Subscribe the
callback
function to the specifiedevent
type.If there is no callback, it returns a stream object which you can either use as a promise or the map/filter/reduce operators.
If the event has a namespace (e.g.
foo.bar
), the callback will uniquely replace any previous callback on that namespace. -
emitter
.once
('event'[,
callback
])
Subscribe the
callback
function to the specifiedevent
type which is then unsubscribed after it is invoked.If there is no callback, it returns a stream object which you can either use as a promise or the map/filter/reduce operators.
If the event type has a namespace (e.g.
foo.bar
), the callback will uniquely replace any previous callback on that namespace. -
emitter
.emit
('event'[,
value
[,
filter
]])
Emits an event on some type with an optional value. If the value is an array, they will be spread on the callback function and passed directly to promises/streams.
filter
is a function that allows filtering of handlers with namespaces that should be invoked. This has been useful in some advanced cases to prevent positive feedback loops. -
emitter
.off
('event'[,
callback
])
Unsubscribes the
callback
function from the specifiedevent
type.If there is no callback, all callbacks are unsubscribed.
-
stream
.map
((d, i, n) => ...)
Creates a new stream object, using the specified function to transform the input stream of values. The return value of the function is the input to any listeners.
d
- The input valuei
- The index of the current eventn
- This gives you access to the current stream object for complete control to implement your own operators
-
stream
.filter
((d, i, n) => ...)
Creates a new stream object, using the specified function to determine whether to emit the current value.
d
- The input valuei
- The index of the current eventn
- This gives you access to the current stream object for complete control to implement your own operators
-
stream
.reduce
((acc, d, i, n) => ..., seed = 0)
Creates a new stream object, using the specified function to transform the input stream of values, with an additional variable (
acc
) for stateful computations. The return value of the function becomes the next value ofacc
, which is initially set toseed
.acc
- A variable that can be used for stateful computations across eventsd
- The input valuei
- The index of the current eventn
- This gives you access to the current stream object for complete control to implement your own operators
-
stream
.next
(value)
Invoke all listeners with the specified value. Returns the same
stream
. -
stream
.off
([
stream
])
Unsubscribe the specified stream object. If no stream is specified, unsubscribe all listeners. Returns the same
stream
. -
stream
.unsubscribe
()
Unsubscribes the stream from the parent stream.
-
stream
.source
This refers to the root source, so if you for example build a pipeline like
pipeline = foo.on('event').map(d => d).map(d => d).map(d => d)
, thenpipeline.source
is equal to that initialfoo.on('event')
. This is convenient for tearing down an entire pipeline by doingpipeline.source.unsubscribe()
.