Skip to content
Eugene Ghanizadeh edited this page Oct 19, 2021 · 9 revisions

Streamlets are utilities for handling pullable, listenable, sync and async streams of data and event. For example, imagine you want to log elements of an array waiting one second between each log. Without streamlets, it would look like this:

const log = arr => {
  let cursor = 0
  const interval = setInterval(() => {
    if (cursor >= arr.length) {
      clearInterval(interval)
    } else {
      console.log(arr[cursor++])
    }
  })
}

With streamlets, it looks like this:

import { pipe, iterable, pullrate, tap, iterate } from 'streamlets'

const log = arr => pipe(
  iterable(arr),
  pullrate(1000),
  tap(console.log),
  iterate
)

As a bonus, you would be able to pause / resume this stream as well:

const iteration = log([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

setTimeout(() => iteration.stop(), 3000)
setTimeout(() => iteration.start(), 5000)

Real World Example

In this example, we use streamlets to create tool for searching in PokéAPI. This code will automatically fetch the data when user has finished typing without spamming the API server, cancels mid-flight requests if the user input changes, and handle errors (e.g. wrong Pokémon name).

HTML Code
<h1>Poké-Search</h1>
<input type="text" />
<pre></pre>
const input = document.querySelector('input')
const pre = document.querySelector('pre')
  
pipe(
  event(input, 'input'),                                       // 👉  When the user inputs something ...
  map(() => input.value.toLowerCase()),                        // ... make it lower case ...
  tap(i => pre.textContent = !!i ? 'LOADING ...' : ''),        // ... display "LOADING ..." ...
  debounce(500),                                               // ... wait 500ms so they have finished typing ...
  filter(i => !!i),                                            // ... ignore empty strings ...
  map(i => fetch(`https://pokeapi.co/api/v2/pokemon/${i}`)),   // ... fetch data from API ...
  flatten,                                                     // ... flatten the stream (cancels mid-flight requests for new ones) ...
  map(r => promise(r.json())),                                 // ... convert the response to JSON ...
  flatten,                                                     // ... flatten again ...
  map(v => JSON.stringify(v, null, 2)),                        // ... make the JSON print-able ...
  tap(v => pre.textContent = v),                               // ... display the JSON.
  finalize(() => pre.textContent = 'COULD NOT LOAD'),          // ⛔  display error when error happens (e.g. wrong name) ...
  retry,                                                       // ... but don't give up (maybe next time user types the correct name).
  observe,                                                     // 👀  observe this stream (otherwise, it'll do nothing).
)

Try in Sandbox