Skip to content

Handling Backpressure

Eugene Ghanizadeh edited this page Nov 14, 2021 · 3 revisions

A source can produce data faster than the sink can consume it. For example, here the sink (the body of the for loop) takes longer to process each data:

// the source produces data every 500ms
const src = interval(500)

// the sink takes 1s to process each data point
for await (const i of next(src)) {
  console.log(i)
  await sleep(1000)
}

In these situations, we need to manage this excess flow by applying backpressure. For example, next() by default handles backpressure by being lossy, i.e. it drops values emitted before the sink has processed previous values:

const src = interval(500)
for await (const i of next(src)) {
  console.log(i)
  await sleep(1000)
}

// > 0
// > 3
// > 5
// ...

Shared and non-shared sources

A non-shared source produces data specific for each sink, which means in theory they are able to slow down to match the needs of the sink. Specifically, the source can be paused whenever it overwhelms the sink, and then resumed when the sink is able to accept more data (given that the source supports pausing and resuming).

This is what backpress() transform does. It pauses the source whenever it pushes data before the sink has requested any:

// apply back pressure to the interval
const src = backpress(interval(500))

for await (const i of next(src)) {
  console.log(i)
  await sleep(1000)
}

// > 0
// > 1
// > 2
// > 3
// ...

Try in Sandbox

This strategy does not work for shared sources. A shared source is already producing the same data for other sinks, and pausing its stream for a sink simply means the sink missing out on some data (imagine pausing a live sports match, you'll simply miss out the duration of the pause).


Lossy Backpressure

To stop the sink from being overwhelmed, one solution is to drop excess data points. For example, next() drops all values until the sink is free again. Similarly, backpress() will miss values pushed by shared sources while it has them on pause (again, because the sink hasn't yet requested more data).

There are lots of use cases for lossy backpressure: for example when the user is typing into a text input and you want to auto-fetch data based on their query, you can simply debounce() the input stream and drop its values until its rate drops below a certain threshold (i.e. the user has stopped typing):

HTML Code
<button>Click Me Rapidly!</button>
<div></div>
import { pipe, event, debounce, observe, tap } from 'streamlets'

const button = document.querySelector('button')
const div = document.querySelector('div')

pipe(
  event(button, 'click'),
  tap(() => div.textContent = 'Clicking ...'),

  // the stream doesn't get passed here as long as the button
  // is clicked faster than once per 500ms
  debounce(500),

  tap(() => div.textContent = 'Tired of clicking?'),
  observe
)

Try in Sandbox

Alternatively, you can throttle() the stream, allowing data at maximum specified rate and dropping values in between:

HTML Code
<button>Click Me Rapidly!</button>
<div></div>
import { pipe, event, observe, tap, scan } from 'streamlets'

const button = document.querySelector('button')
const div = document.querySelector('div')

pipe(
  event(button, 'click'),

  // no matter how fast you click on the button,
  // the counter won't increase faster than once per second.
  throttle(1000),

  scan(c => c + 1, 0),
  tap(c => div.textContent = c),
  observe
)

Try in Sandbox

Lossless Backpressure

As mentioned above, for non-shared sources you can apply lossless backpressure using backpress() transform. For shared sources, the only lossless solution is to buffer() the excess data so that the sink can consume them at its own pace:

// buffer excess values from src
// which can be pulled by the sink at its own pace
const src = buffer(interval(500))

for await (const i of next(src)) {
  console.log(i)
  await sleep(1000)
}

// > 0
// > 1
// > 2
// > 3
// ...

Try in Sandbox

The downside of this approach is that if the source is consistently producing data faster than what the sink can consume, then the buffer will get larger and larger, consuming ever increasing amounts of memory. You could put a limit on the buffer (i.e. buffer(src, 128), but then you would start losing data when the buffer overflows, and you would be back to a lossy solution.