Skip to content

Pump fork allowing multiple readers and writers for Javascript streams.

License

Notifications You must be signed in to change notification settings

wholebuzz/tree-stream

Repository files navigation

tree-stream image test

tree-stream is a small node module that pipes streams together and destroys all of them if one of them closes.

npm install tree-stream

This package is forked from mafintosh's pump and aims to be a superset of pump. When the provided pipe() topology is a linked-list, they're functionally equivalent.

What problem does it solve?

  • The original problems pump solved: When using standard source.pipe(dest) source will not be destroyed if dest emits close or an error. You are also not able to provide a callback to tell when then pipe has finished.

  • The object model (of ReadableStreamTree and WritableStreamTree) is expressive. A representation for a sequence (or DAG) of stream transforms turns out to be really useful. Sometimes you want to "pipeFrom" (a WritableStreamTree) e.g. (from @wholebuzz/fs/src/local.ts):

  import StreamTree, { ReadableStreamTree, WritableStreamTree } from 'tree-stream'

  async function openWritableFile(url: string, _options?: OpenWritableFileOptions) {
    let stream = StreamTree.writable(fs.createWriteStream(url))
    if (url.endsWith('.gz')) stream = stream.pipeFrom(zlib.createGzip())
    return stream
  }

And sometimes you want the typical "pipe" case (for a ReadableStreamTree):

  async function openReadableFile(url: string, options?: OpenReadableFileOptions) {
    let stream = StreamTree.readable(fs.createReadStream(url))
    if (url.endsWith('.gz')) stream = stream.pipe(zlib.createGunzip())
    return stream
  }

You can equivalently apply a transform then, with either:

  const tf = new Transform({ objectMode: true, transform(x, _, cb) { this.push(x); cb(); } })
  readable.pipe(tf)

or

  writable.pipeFrom(tf)

Provided that the ReadableStreamTree and WritableStreamTree are connected later:

  const returnValue = await pumpWritable(writable, 'any return value', readable)

These APIs form the basis of @wholebuzz/fs, which together with dbcp powers @wholebuzz/mapreduce.

Usage

var streamTree = require('tree-stream')
var fs = require('fs')

var source = fs.createReadStream('/dev/random')
var dest = fs.createWriteStream('/dev/null')

var stream = streamTree.readable(source)
stream = stream.pipe(dest)
stream.finish(function(err) {
  console.log('pipe finished', err)
})

setTimeout(function() {
  dest.destroy() // when dest is closed tree-stream will destroy source
}, 1000)

You can process an input stream and also hash it:

var streamTree = require('tree-stream')
var fs = require('fs')
var hasha = require('hasha')

var readable = streamTree.readable(fs.createReadStream('/tmp/foo.txt'))
readable = readable.split(2)
hasha.fromStream(readable[0].finish()).then((hash) => console.log(`Hash: ${hash}`))
readable[1].finish().on('data', function(data){ console.log(`Data: ${data}`) })

// Outputs:
// Data: unicorn
// Hash: e233b19aabc7d5e53826fb734d1222f1f0444c3a3fc67ff4af370a66e7cadd2cb24009f1bc86f0bed12ca5fcb226145ad10fc5f650f6ef0959f8aadc5a594b27

You can get pretty wild:

var streamTree = require('./tree-stream')
var fs = require('fs')
var hasha = require('hasha')

var writable = streamTree.writable(fs.createWriteStream('/tmp/foo.txt'))
var readable
[writable, readable] = writable.joinReadable(1)
hasha.fromStream(readable[0].finish()).then((hash) => console.log(`Hash: ${hash}`))

writable = writable.joinWritable([fs.createWriteStream('/tmp/bar.txt')])
var stream = writable.finish()
stream.write('unicorn', function() { stream.end() })

// /tmp/foo.txt and /tmp/bar.text now contain "unicorn"
// Outputs:
// Hash: e233b19aabc7d5e53826fb734d1222f1f0444c3a3fc67ff4af370a66e7cadd2cb24009f1bc86f0bed12ca5fcb226145ad10fc5f650f6ef0959f8aadc5a594b27

License

MIT

Related

Derived from pump, part of the mississippi stream utility collection.

About

Pump fork allowing multiple readers and writers for Javascript streams.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published