Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipe to writable with parameters #664

Open
tstelzer opened this issue Nov 17, 2018 · 2 comments
Open

pipe to writable with parameters #664

tstelzer opened this issue Nov 17, 2018 · 2 comments

Comments

@tstelzer
Copy link

tstelzer commented Nov 17, 2018

Hey there, first: I absolutely love your work, great library! Makes working with streams so much fun and has great inter-op with ramda!

This is more a question than an issue because I couldn't find a better place to ask it:

Is there a idiomatic way to dynamically create write streams and pipe to them, using some derived file name?

Currently I'm doing something like:

const writeFile = ({filename, content}) =>
  fs.createWriteStream(filename).write(content)

stream
  .map(writeFile)
  .done(/* consumes the stream */)

However, I'd prefer to simply consume the write stream via pipe like:

stream.pipe(writeFile)

What would be a better way to do this?

/edit:
And from the node docs I can tell my version is not actually safe for writing to the same file in sequence ....

@vqvu
Copy link
Collaborator

vqvu commented Nov 18, 2018

Hi. Thanks for the kind words!

I don't think there's any more idiomatic way of doing things. Your use case is not something we support natively. You're also right that your version isn't safe for writing to the same file in sequence. There's a few ways to do this safely.

If you know there's only a small amount of data that you need to write and a small number of files that you need to write to, you can reuse the write stream instead of creating a new one all the time. Something like this

function writeFile() {
  const cache = new Map();
  const writeHandler = ({filename, content}) => {
    if (!cache.has(filename)) {
      cache.set(filename, fs.createWriteStream(filename));
    }
    cache.get(filename).write(content);
  });

  return (stream) => 
      stream.map(writeHandler)
          .done(() => cache.forEach((writeStream) => writeStream.end()));
}

stream.through(writeFile());

The above can materialize your entire stream, so it can use a lot of memory if your stream produces data faster than your storage can write it.

If you don't have enough memory to use the previous approach, then you can wait until each write is done before starting a new one.

function writeFile({filename, content}) {
  return _((push) => {
    fs.createWriteStream(filename, {flags: 'a'}) // append mode so you don't overwrite the file
        .end(content, () => push(null, _.nil)); // ends the outer stream when the write is done
  });
}

stream.flatMap(writeFile)
    .done(/* consumes the stream */);

The above assumes that you're OK with always appending to the files. If not, you need to pick between append mode and write mode depending on if you've seen the file or not. It also doesn't allow you to do the writes in parallel if you're writing to multiple files.

You could also combine the two approaches if you want to get better performance and bounded memory usage. I won't code it up here, but basically you could write your own Writable implementation that under the hood multiplexes to different file write streams. That implementation can track how much data has been written to the different output streams and apply backpressure (i.e., return false from write()) when it needs to pause the input stream a bit. It'll also need to emit drain when it can handle more data.

Then you can use highlandStream.pipe(yourCustomWritable). Highland will respect node writable backpressure.

@tstelzer
Copy link
Author

Thank you for the write up, much appreciated!
As I need to write an arbitrary amount of data to an arbitrary amount of files, I gyess to have to implement my own Writable. My project is about learning streams anyway, so I don't mind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants