Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Which of these methods to limit how fast things get piped? #558

Open
rightaway opened this issue Oct 25, 2016 · 25 comments
Open

Which of these methods to limit how fast things get piped? #558

rightaway opened this issue Oct 25, 2016 · 25 comments

Comments

@rightaway
Copy link

rightaway commented Oct 25, 2016

In the example below, I'm downloading a gzipped file that's several hundred megabytes (millions of rows), then piping it through highland so I can use the batch feature. This way, instead of inserting 1 row into the db at a time (csvStream emits one 'data' event for one row), I can do it for 10000 rows at a time in the 'data' event handler you see below.

download.stream(url)
  .pipe(zlib.createGunzip())
  .pipe(csvStream)
  .pipe(highland())
  .batch(10000)
  .on('data', async (data) => {
    await insertIntoDB(data)
  })

But when running this I get out of memory errors and the system starts to slow down significantly. I think it's because the data is coming in too fast to the 'data' event. The csvStream's finish event happens in a couple of minutes but the program runs for up to another hour, which indicates that the whole csv file has been read into memory, rather than being piped downstream piece by piece as the data event consumes the batches.

I'm new to highland and looking through the documentation I can't tell which of the various methods would be the most appropriate in this case. http://highlandjs.org/#backpressure seems like it's most relevant to this situation but I can't tell how to use it in this code. http://highlandjs.org/#parallel looks good too.

Can I configure highland so that at any time there's only for example 3 batches (where batch is 10000) worth of rows that have been read? And it only reads another 10000 rows when one of those 3 batches is complete.

@vqvu
Copy link
Collaborator

vqvu commented Oct 26, 2016

I'm not super familiar with the async functions spec, but I think the code you've posted will just execute as many insertIntoDB as necessary to get through your input data, all in parallel. The fact that you await inside your async function doesn't actually cause it to block the data call. You'd get the same behavior if you passed insertIntoDB to on('data') directly.

You're right that backpressure is what you want though. In Highland, you get it for free as long as you don't opt-out. Using on('data') is opting-out. You shouldn't really need to use the on method ever, really. It's why we don't document the events on the website.

To do what you want, you need to use parallel or mergeWithLimit. They're more or less the same with the exception that parallel preserves order and mergeWithLimit doesn't. The way you use them is by constructing a stream of streams. What's usually done is that you use map to perform some asynchronous action, and return a stream that waits for that action to complete and then emits the result. The stream here functions much like a promise. It doesn't contain any data; it's just a handle for later getting at your data. Then you call paralel or mergeWithLimit, which will handle the backpressure for you.

For example,

download.stream(url)
  .pipe(zlib.createGunzip())
  .pipe(csvStream)
  .pipe(highland())
  .batch(10000)

  // The result of the compose is equivalent to this arrow function:
  //   data => highland(insertIntoDB(data))
  // For every object, which is 10000 rows, call insertIntoDB, which returns a promise,
  // then wrap the promise in a stream using the highland constructor. You now have a
  // stream of streams.
  .map(highland.compose(highland, insertIntoDB))

  // Merge the stream elements together by consuming them, making sure that only 3 are being
  // consumed at a time. You may, of course, replace 3 with whatever parallelism factor you
  // want.
  .mergeWithLimit(3)

  // Consume the results and execute the callback when done.
  .done(() => {
    console.log('I am done.');
  });

The reason this works is the laziness and backpressure features of Highland. Nothing happens until you call done, which is a consumption operator.

  1. done will ask for data from the mergeWithLimit stage.
  2. mergeWithLimit will ask for streams from the map stage, stopping once it has gotten 3 of them. Once it has finished consuming one of the three, it will ask for more. This is backpressure.
  3. map only executes when mergeWithLimit asks for data. This is laziness. Since mergeWithLimit will only ask for 3 at a time and won't ask for more until a previous stream (i.e., task) has completed, you will only have 3 tasks in-flight at any one time.
  4. map of course exerts backpressure on the previous stages, which keeps memory usage from blowing up.

@rightaway
Copy link
Author

Thank you for this explanation! Very clear and useful.

Is there a way to stop piping when a certain condition is met? In the snippet below, range returns an iterator from 0 to Infinity, and maps those values over some operation (like downloading a page), doing 5 in parallel at a time.

highland(range(0, Infinity))
  .map(page => highland(download(page)))
  .parallel(5)
  .done(() => {
    console.log('I am done.')
  })

But when some condition is met, like for example the pages no longer exist (which would happen at some point since the iterator goes to Infinity), is there a way to signal to the whole stream to end gracefully? So the 5 operations currently running in parallel would finish their work, but parallel would be informed to not request any further values from map.

I think I would need to put something between parallel and done? In the documentation I see stopOnError and errors but those seem different than the kind of functionality I'd be looking for.

@vqvu
Copy link
Collaborator

vqvu commented Oct 28, 2016

Yes, there's a way, but you'll have to write your own operator via consume. The way it works is that consume feeds you a tuple (err, value, push, next). You do whatever processing you need, calling push() when you want to emit new values and next() when you are done processing. It will take care of backpressure for you, and will only call your handler when more data is needed. To stop, just emit a highland.nil. Usually you do this in response to receiving value === highland.nil, but it's not necessary.

See the implementation for slice for an example of this "early return". You can perform many arbitrary transforms using consume; it's very powerful (but still pretty easy to use). In fact most provided transforms are implemented on top of it.

In your case, I'm going to assume you want to keep going until you see a 404, which could be done like this (assuming the use of request, you can adapt as necessary).

highland(range(0, Infinity))
  .consume((err, page, push, next) => {
    if (err) {
      push(err); // Just pass on errors.
      next();    // Ask for more data
    } else if (page === highland.nil) {
      // End marker. Pass that on too. There is guaranteed to be no more data,
      // so we don't call next().
      push(null, highland.nil);
    } else {
      request(pageToUrl(page));
        .on('response', res => {
          if (res.statusCode === 200) {
            // We have data, so we push a new stream.
            push(null, highland(res));
            next(); // Ask for more
          } else {
            // No more data, so we push highland.nil, which will cause
            // the stream to stop. As always, laziness will prevent the original
            // infinite stream from being consumed anymore.
            push(null, highland.nil);
        });
    }
  })
  .parallel(...)
  ...

@rightaway
Copy link
Author

That looks great! I can't seem to get it to work in parallel though. It takes the same amount of time regardless of what value I pass to parallel. The output below seems to also show that it's running them one or so at a time.

In this code I changed it slightly so that I can call push and next from within download. Not sure if that's what's causing the problem. I removed the async/await from the snippet and it didn't change anything, so that doesn't seem to be the cause.

function download(data, push, next) {
  console.log(`start ${data}`)
  return request(url + data)
    .then(async (response) => {
      if (response.body.includes('last page')) {
        push(null, highland.nil)
      } else {
        await doSomething(response)
        push(null, highland(response))
        next()
      }
      console.log(`finish ${data}`)
    })
}

highland(util.range(1, 10))
  .consume((err, page, push, next) => {
    if (err) {
      push(err)
      next()
    } else if (page === highland.nil) {
      push(null, highland.nil)
    } else {
      download(page, push, next)
    }
  })
  .parallel(10)
  .done(() => {
    console.log('done')
  })

Gives the output

start 1
start 2
finish 1
start 3
finish 2
start 4
finish 3
start 5
finish 4
start 6
finish 5
start 7
finish 6
start 8
finish 7
start 9
finish 8
finish 9

@vqvu
Copy link
Collaborator

vqvu commented Oct 29, 2016

This is expected behavior. The handler that you pass to consume is executed in series. The way it works is that Highland will call your handler and wait for a call to next(). Only after you've called next() and Highland determines that you need more data will it call your handler again. Since you log both the start and end inside your handler, it's no surprise that they are interleaved. It is the streams that you returned are being consumed in parallel.

You should be able to see the parallelism if you do this instead

function download(data, push, next) {
  console.log(`start ${data}`)
  return request(url + data)
    .then(async (response) => {
      if (response.body.includes('last page')) {
        push(null, highland.nil)
      } else {
        await doSomething(response)
        const stream = highland(response);

        // This will wait until the stream completes before printing "finish"
        stream.observe()
          .done(() => console.log(`finish ${data}`))
        push(null, stream)
        next()
      }
    })
}

As for why your code takes the same amount of time regardless of the parallelism factor, I'm not sure. It could be because of the await doSomething(response) that you have? download executes in series, so you want to make it finish as fast as you can. You can fix this by executing the doSomething "inside" the stream instead.

    .then((response) => {
      if (response.body.includes('last page')) {
        push(null, highland.nil)
      } else {
        const stream = highland(async (push, next) => {
          // This function will only be executed when the stream is consumed.
          await doSomething(response)
          next(highland(response));
        });

        // This will wait until the stream completes before printing "finish"
        stream.observe()
          .done(() => console.log(`finish ${data}`))
        push(null, stream)
        next()
      }
    })

It could also be that the majority of the time is spent in establishing the connection and that the download is relatively fast. If this is the case, then maybe you don't benefit from increased parallelism.


FYI, there's currently an issue right now with pushing nil asynchronously in this way. See #563..

@vqvu
Copy link
Collaborator

vqvu commented Oct 29, 2016

It could also be that the majority of the time is spent in establishing the connection and that the download is relatively fast. If this is the case, then maybe you don't benefit from increased parallelism.

You could also do some sort of speculative downloading so eliminate this bottleneck as well. Not sure if it's worth it, but I'm happy to explain more if you'd like.

@vqvu
Copy link
Collaborator

vqvu commented Oct 30, 2016

FYI, there's currently an issue right now with pushing nil asynchronously in this way. See #563..

Fix released. You'll want to upgrade to 1.10.1.

@rightaway
Copy link
Author

I must be doing something wrong, because I still get the same output as in my previous post, where it acts exactly as though I had set parallel to 2, even though I've set it to higher. (It starts 1, starts 2, and then it only starts the next when it finishes one of those.)

Here's a standalone example where I've included only what's necessary to demonstrate this. got is an http download package. I've upgraded highland to 1.10.1 as you suggest.

highland(range(1, 50))
  .consume((err, data, push, next) => {
    console.log(`start ${data}`)
    got(`http://www.news.com.au/`, { timeout: 5000 })
      .then((response) => {
        const stream = highland(response)
        stream.observe()
          .done(() => console.log(`finish ${data}`))
        push(null, stream)
        next()
      })
  })
  .parallel(10)
  .done(() => {
    console.log('done')
  })

@vqvu
Copy link
Collaborator

vqvu commented Oct 30, 2016

This probably happens because the download time is not much larger than the connection time. It may be that the entire response arrived in a single Socket Buffer or something like that.

I was able to get the parallel behavior when I set up local servers to download larger files. Repro steps:

  1. Grab a large file from somewhere (I used the intellij distribution tar).

  2. Stand up a local server so that I'm not spamming someone else's.

    // Code from the nice folks at http://stackoverflow.com/a/8427954
    var connect = require('connect');
    var serveStatic = require('serve-static');
    
    connect()
        .use(serveStatic('/home/vqvu/Desktop'))
        .listen(8080, function(){
            console.log('Server running on 8080...');
        });
  3. Do the request against the server.

    function range(x, y) {
        var i = x;
        return function (push, next) {
            push(null, i);
            i++;
            if (i < y) {
                next();
            } else {
                push(null, highland.nil);
            }
        }
    }
    
    function get(url, options) {
        return new Promise((res, rej) => {
            request(url, options)
                .on('response', res);
        });
    }
    
    highland(range(1, 50))
      .consume((err, data, push, next) => {
        console.log(`start ${data}`)
        const req = get('http://localhost:8080/ideaIU-15.0.2.tar.gz', { timeout: 5000 });
        req
          .then((response) => {
            const stream = highland(response);
            stream.observe()
              .done(() => console.log(`finish ${data}`))
            push(null, stream)
            next()
          })
      })
      .parallel(5)
      .done(() => {
        console.log('done')
      })

The result that I get is

start 1
start 2
start 3
start 4
start 5
finish 2
finish 3
start 6
finish 1
start 7
finish 5
start 8
start 9
finish 4
start 10
...

@rightaway
Copy link
Author

rightaway commented Oct 31, 2016

It's quite strange, even if I use the example you just gave or the one from my previous post, and I use a url to a large file (in this case http://releases.ubuntu.com/16.04.1/ubuntu-16.04.1-desktop-amd64.iso), the output is start 1 and it waits for the entire file to download before continuing. So no parallel behaviour. What happens if you use a url to that same ubuntu file? If it works in parallel for you then something is really strange because then it's only happening on my system.

@vqvu
Copy link
Collaborator

vqvu commented Oct 31, 2016

Yep, I get parallel behavior using the Ubuntu URL. I tested on Node 7 on an Ubuntu VM and Node 6.9 on a Windows machine.

@vqvu
Copy link
Collaborator

vqvu commented Oct 31, 2016

Here's the exact file that I used: https://gist.github.com/vqvu/6174247b413db479acd43c32f9bd551c. request, bluebird, and highland all on the latest version.

@rightaway
Copy link
Author

rightaway commented Oct 31, 2016

Thanks for providing that gist, I'm able to see now that the issue is being caused by the way I do the request. This code in your example returns a Promise that then is called on.

function get(url, options) {
    return new Promise((res, rej) => {
        request(url, options)
            .on('response', res);
    });
}

But if I get the Promise by doing this (tried with both request-promise and got packages, which both return promises), then it blocks.

const requestPromise = request('request-promise')
function get(url, options) {
    return requestPromise(url, options)
}

It's really confusing to me because in all cases a Promise is being returned, and promises are resolved asynchronously. Any idea why my way of doing it would block in consume?

@vqvu
Copy link
Collaborator

vqvu commented Oct 31, 2016

I've never used request-promise, but it looks like the default is to return the full body when you call then. So the blocking behavior makes sense. You're waiting for the full response before doing anything. Presumably got does something similar.

Maybe try using the resolveWithFullResponse parameter?

@rightaway
Copy link
Author

Interesting. I can go with the with way you did it with request since that does work.

It could also be that the majority of the time is spent in establishing the connection and that the download is relatively fast. If this is the case, then maybe you don't benefit from increased parallelism.

You could also do some sort of speculative downloading so eliminate this bottleneck as well. Not sure if it's worth it, but I'm happy to explain more if you'd like.

You mentioned this earlier and I think it might simplify the way I'm doing this. Would you mind explaining it? Does it mean downloading more pages than necessary and when all in the batch have got 404s then the program terminates? I feel like the final code might be cleaner in such a setup (as it's not a problem for this use case to download more pages than necessary).

@vqvu
Copy link
Collaborator

vqvu commented Nov 1, 2016

Yeah, it's basically download more than necessary, and you can even use request-promise. Something like this

function download() {
  let done = false;
  return (err, page, push, next) => {
    if (page === highland.nil || done) {
      push(null, highland.nil)
    } else if (err) {
      push(err)
      next()
    } else {
      console.log(`start ${page}`);

      // Just try the download. If there's an error, we toggle the `done` flag and
      // future attempts at downloads will end the stream.
      const stream = highland(requestPromise(makeUrl(page)))
        .errors((error, push) => {
          if (!is404(error)) {
            // Assume that 404 means no more data.
            done = true;
          } else {
            // Pass through all other errors.
            push(error);
          }
        });
      stream.observe()
        .done(() => console.log(`end ${page}`));
      push(null, stream);
      next();
    }
  };
}

highland(util.range(1, 10))
  .consume(download())
  .parallel(10)
  .done(() => {
    console.log('done')
  })

Worst case, you download n pages more than necessary where n is your parallelism factor. A bit simpler.

@rightaway
Copy link
Author

rightaway commented Nov 4, 2016

Thanks again! That is indeed simpler.

Is it possible to put sets of streams together sequentially? sequence and parallel(1) don't seem to be the ones to use in this case.

async function doHighlandOperation(array, operationFunction) {
  const range = await getRange(array)  
  const firstOperation = highland(range)
    .consume(operationFunction())
    .parallel(10)
    .done()
}

async function run() {
  await doHighlandOperation([1, ....... 100], doSomething())
  await doHighlandOperation([2000, ....... 3000], differentThing())
}

If I run this code, both doHighlandOperations are running at the same time. How could I get the first doHighlandOperation to complete fully (none of its 10 parallel operations are still going on), and the second doHighlandOperation to begin only at that point?

@vqvu
Copy link
Collaborator

vqvu commented Nov 4, 2016

The reason this doesn't work is because done may not be synchronous, depending on how the stream was constructed (most of the time it isn't), so it may return before the stream completes.

Either convert the stream into a Promise and await it

async function doHighlandOperation(array, operationFunction) {
  const range = await getRange(array)
  await new Promise((res, rej) => highland(range)
    .consume(operationFunction())
    .parallel(10)
    .stopOnError(rej)
    .done(res));
}

or use sequence, which is normally the way to execute streams sequentially when you don't have async function

async function doHighlandOperation(array, operationFunction) {
  const range = await getRange(array)  

  // Note done isn't called, so we don't start the stream
  return highland(range)
    .consume(operationFunction())
    .parallel(10);
}

async function run() {
  const s1 = await doHighlandOperation([1, ....... 100], doSomething())
  const s2 = await doHighlandOperation([2000, ....... 3000], differentThing())
  await new Promise((res, rej) => _([s1, s2]).sequence()
    .stopOnError(rej)
    .done(res));
}

@rightaway
Copy link
Author

@vqvu Both solutions you suggested (converting the stream to a Promise, or using sequence) work great for starting the second stream when the first stream finishes. Is there an equivalent to the sequence solution for doing things in a loop, or is using a Promise the best way?

For example, here all the getAll functions are running in parallel rather than the next iteration of the loop starting only after the first stream is fully consumed. What would you suggest is the recommended approach to running the loop sequentially?

async function loop() {
  while (true) {
    await getAll()
  }
}

function getAll() {
  highland(getPages())
    .map(page => highland(download(page)))
    .parallel(5)
    .done(() => {
      console.log('done')
    })
}

@vqvu
Copy link
Collaborator

vqvu commented Jan 8, 2017

Making getAll an async function and returning a promise from it should make the await in your loop do what you want. The promise should resolve once the stream is fully consumed---same code as in my example above.

@rightaway
Copy link
Author

rightaway commented Jan 12, 2017

In the below example from earlier, could you give some clarification on what the download function needs to have in order for highland to be able to work with it as expected?

highland(range(0, 1000))
  .map(page => highland(download(page)))
  .parallel(5)
  .done(() => {
    console.log('I am done.')
  })

Are both of these options fine to use as the download function above? Or is my understanding flawed about how this works?

Option 1: The function isn't async, so therefore it must return a Promise.

function download(page) {
  return new Promise((resolve, reject) => {
    request(page, (e, response) => {
      if (e) { reject(e) } else { resolve(response) }
    })
  })
}

Option 2: The function is async, so it doesn't return anything but it must await the Promise.

async function download(page) {
  await new Promise((resolve, reject) => {
    request(page, (e, response) => {
      if (e) { reject(e) } else { resolve(response) }
    })
  })
}

@vqvu
Copy link
Collaborator

vqvu commented Jan 13, 2017

download should return a promise that resolves once the download completes. Both of your options will work. Use the one that you like better.

@rightaway
Copy link
Author

Looking at the documentation for http://highlandjs.org/#parallel, it says that it is "buffering the results until they can be returned to the consumer in their original order". So in the example range(1000).map(readFile).parallel(10), it reads 1000 files in groups of 10. But the problem is let's say each group of 10 has a slow file, so 9 of the 10 files finish quickly, but the next group of 10 won't start until that 10th file in the previous group finishes.

Is there a way to do it where as soon as 1 of the 10 files finishes, it grabs the next one immediately? So at any given time, there are always 10 files being processed in parallel if there are still files available in the source array (range(1000) in this case).

This will of course prevent the results from being returned in their original order, but it's fine given the improvement in parallelism.

@vqvu
Copy link
Collaborator

vqvu commented Jan 15, 2017

If you don't care about getting results in the original order, then you can use mergeWithLimit instead.

@kjvalencik
Copy link
Contributor

I found myself wanting to consume an elasticsearch scroll as a stream.

It was pretty simple to write a generator function that would continue to fetch the next batch. However, the issue I had was that the next batch would not be fetched until the current batch was finished consuming. This caused the total time to be time scrolling + time consuming instead of Max(time scrolling, time consuming).

I struggled getting this batching, flattening, and maintaining back pressure right and wanted to share my solution since it is simple and working well.

'use strict';

const highland = require('highland');

// Resolve after `n` milliseconds for simulating an HTTP get
function delay(n) {
	return new Promise(resolve => setTimeout(resolve, n));
}

// This isn't really important. It's just simulating what elasticsearch does
// internally for scrolling.
function FakeCursor({
	batchSize = 5,
	numBatches = 10,
	delay : delayMS = 1000
} = {}) {
	const docs = (new Array(batchSize)).fill(0).map((_, i) => i);

	return function getNext(n) {
		console.log(`requested: ${n}`);

		return delay(delayMS).then(() => {
			console.log(`sent: ${n}`);

			if (n >= numBatches) {
				return {
					docs : []
				};
			}

			return {
				next : n + 1,
				docs : docs.map(i => `${n} - ${i}`)
			};
		});
	};
}

// The specifics of this aren't important. This method should maintain
// internal state and return a generator method for highland. It should
// emit batched collections as single events and not an event for each item
// in order to simplify parallelism.
function FakeCursorGenerator(opts) {
	const getNext = FakeCursor(opts);
	let n = 0;

	return (push, done) => getNext(n).then(({ next, docs }) => {
		n = next;

		if (docs.length) {
			push(null, docs);
			done();
		} else {
			push(null, highland.nil);
		}
	});
}

// If you consumption is mostly synchronous, this is critical. If you do
// not push the current set of data to the end of the event loop, your
// next fetch in the generator will be blocked until after consumption.
function AsyncStream() {
	return data => highland(push => setImmediate(() => {
		push(null, data);
		push(null, highland.nil);
	}));
}

// Simulate some synchronous processing that takes a non-trivial amount of time
function BusyWait(n) {
	return doc => {
		const end = Date.now() + n;

		while (Date.now() < end);

		return doc;
	};
}

// Set up the stream from a generator. This send batches of items resulting
// from each fetch.
highland(FakeCursorGenerator())

	// Push processing to the end of the event loop
	.map(AsyncStream())

	// Maintain back-pressure so that we are fetching concurrently with
	// processing. Unless you have very eratic times, it is likely
	// unnecessary to use any value other than `2`
	.parallel(2)

	// Flatten the events out
	.sequence()

	// The rest of your processing continues as normal
	.map(BusyWait(100))
	.each(console.log);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants