Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use event-stream #110

Open
rufuspollock opened this issue Jan 10, 2014 · 9 comments
Open

Use event-stream #110

rufuspollock opened this issue Jan 10, 2014 · 9 comments

Comments

@rufuspollock
Copy link
Member

switch to using @dominictarr https://github.com/dominictarr/event-stream much more heavily

@dominictarr
Copy link

I wrote event-stream before @substack convinced everyone that utility libraries are worse than using finer grained module.

I suggest using through and map-stream, and other stuff that comes up in npm search stream instead.

@rufuspollock
Copy link
Member Author

@dominictarr good advice.

BTW, how does one in map-stream indicate that one has read enough of the input - e.g. imagine i wanted to implement a map that does stuff with first 50 rows and then does not want any more input. Does one throw an exception? (I could ask this more generally about through - what event should i trigger to say "hey, i don't want the readable stream part any more ...")

@dominictarr
Copy link

ah, you call destroy() on the source stream.

@rufuspollock
Copy link
Member Author

@dominictarr that's what I thought but I couldn't spot destory in v0.10 streams (have I missed it?). If v0.10 does not have destroy what should you do instead?

@dominictarr
Copy link

hmm, oh, I think 0.10 stream have unpipe?

@rufuspollock
Copy link
Member Author

@dominictarr sorry for lag here - this has been super-useful and we are most definitely going to use those libraries (btw: huge props for your work here!).

Just wanted to confirm: is unpipe equivalent of destroy ?

Basis issue for us here is: we want to allow operations on streaming data. Imagine I want to do the following:

  • start streaming a 1 GB csv file into datapipes (request module)
  • parse that CSV (csv streaming module)
  • filter rows based on whether "London" occurs in column 3
  • apply head: i.e. only output the first 10 results
  • output results as JSON (or CSV or ...)

Now it is quite likely we may only need to read the first few MBs (or less) of this big file to get the 10 items that head needs. So the head operation (the last item in the chain) once it has got 10 items needs to tell the previous item in the pipeline to stop sending info, and that in turns needs to tell the csv parser and that in turns tells the request to terminate.

How can we do that? Previously you called destroy but is it now unpipe?

@rufuspollock
Copy link
Member Author

Note: we probably also need to use "Object Mode" on our streams: http://nodejs.org/api/stream.html#stream_object_mode

@dominictarr
Copy link

Oh, sorry - unpipe is not the same as destroy.

node streams do not support this use-case very well unfortunately.
basically, what you need is for the error or end case to be propagated up the stream.
if the consuming stream can no longer accept input - either because it only wanted
the beginning, or because there has been an error, then you need to tell the source to quit.

With node streams you need to hook this up manually, by doing something like:

//end or destroy or something?
dest.on('end', function () {
  source.end() //or destroy or something.
})

Although, I wish there didn't need to be so many types of streams,
when I need this sort of functionality I use https://github.com/dominictarr/pull-stream
It's a much simpler stream pattern, that supports backpressure and propagates errors/abort. it works great for object streams. When I need to interface pull streams with node streams I use https://github.com/dominictarr/stream-to-pull-stream or https://github.com/dominictarr/pull-stream-to-stream

also, you can do some other neat stuff like you can define a transform stream out of other transform streams like this:

//combine 3 transformations
var combinedTransformation = pull(decode, map, encode)
//then pipe through that transformation.
pull(source, combinedTransformation, sink)

You can't really do this in a simple way with node streams... there is https://github.com/dominictarr/stream-combiner/ and https://github.com/naomik/bun
but those module are a little complex.

hopefully the future platform comes along and makes node.js look like ruby will have streams that work well as both data streams and object streams.

@dominictarr
Copy link

most core stream2 streams will still have destroy, otherwise they wouldn't be backwards compatible.
here fs.ReadStream#destroy: https://github.com/joyent/node/blob/master/lib/fs.js#L1542

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants