Skip to content
Seth Yuan edited this page Nov 21, 2013 · 14 revisions

Basic usage

Just wrap the asynchronous function you are going to use with flow.wrap, and use it in a flow block like shown below.

var flow = require("asyncflow");
var fs = require("fs");

var exists = flow.wrap(fs.exists);

flow(function() {
  if (exists("myfile.txt").wait()) {
    // continue here...
  }
});

Within the flow block, everything runs normally except when you call the wrapped functions. When you call a wrapped function, it will execute asynchronously and return a handle with which you can call wait, to "actually" wait for the asynchronous function to finish and return. However notice that during the wait, Node.js event loop is not blocked, and other codes in the event loop may actually run while we wait here.

You can also wrap an entire module at once, but bear in mind that this facility is just for convenience, not all functions in a module can/should be used with asyncflow.

var flow = require("asyncflow");
var fs = require("fs");
var fsw = flow.wrap(fs);

flow(function() {
  if (fsw.exists("myfile.txt").wait()) {
    var content = fsw.readFile("myfile.txt").wait();
    // continue here...
  }
});

Parallel calls

var flow = require("asyncflow");
var fs = require("fs");

var readdir = flow.wrap(fs.readdir);

flow(function() {
  var dirA = readdir("dir_a");
  var dirB = readdir("dir_b");
  var files = dirA.wait().concat(dirB.wait());
  // continue here...
});

Here the two readdirs started running asynchronously when they are called, and they're been waited for when their respective handle's wait is called. In short, parallelism*. :)

* Just to be technically correct, this should really be called concurrency, and not parallelism. Because Node.js is single threaded, so it's impossible to have two piece of code running at the same time.

Parallel forEach and map

You can also use flow.forEach and flow.map. They are intended to be used outside of a flow block.

var flow = require("asyncflow");
var fs = require("fs");

var stat = flow.wrap(function(file, i, a, cb) {
  fs.stat(file, cb);
});

var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
flow.map(files, stat, function(err, stats) {
  // use stats here...
});

If you want to use forEach and map inside of a flow block, you can wrap them first with flow.wrap just as with other async functions, or you can use the already wrapped flow.wrapped.forEach and flow.wrapped.map.

var flow = require("asyncflow");
var fs = require("fs");

var stat = flow.wrap(function(file, i, a, cb) {
  fs.stat(file, cb);
});

flow(function() {
  var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
  var stats = flow.wrapped.map(files, stat).wait();
  // use stats here...
});

Notice that you are forced to specify full arguments in function(file, i, a, cb). This is because functions like Array.forEach and Array.map provide 3 arguments by default, and we have to add another callback as the last argument of the function, so it can be asynchronous. However, it's cumbersome to write these arguments, so we provided you an overloaded version of flow.wrap. With it, you can control how many arguments are passed to the function being wrapped. For example, we can simplify the above source code:

var flow = require("asyncflow");
var fs = require("fs");

flow(function() {
  var files = ["file_a.txt", "file_b.txt", "file_c.txt"];
  var stats = flow.wrapped.map(files, flow.wrap(fs.stat, 1)).wait();
  // use stats here...
});

If you ever need both value and index be passed in, you could specify 2 rather than 1 when calling flow.wrap. Notice that we also eliminated the need to add a wrapper function around fs.stat just to accommodate the number of arguments.

Concurrency limit

Did you know that both parallel calls and parallel forEach/maps have concurrency limit support built-in?

Why we need to limit concurrency you ask? Well, certain system resources will exhaust if we don't limit concurrency, for example, the number of open files, sockets, etc. Consider the following code:

var flow = require("asyncflow");
var fs = require("fs");

var readFile = flow.wrap(fs.readFile);
var fileLength = flow.wrap(function(file, i, a, cb) {
  flow(function() {
    cb(null, readFile(file).wait().length);
  });
});

flow(function() {
  var allDiskFiles = getAllDiskFiles();
  var totalSize = flow.wrapped.map(allDiskFiles, fileLength).wait().
    reduce(function(x, y) { return x + y });
  // use totalSize here...
});

The problem with the above code is that there are too many files opening at the same time, because all files are being read at once (even though asynchronously). This will, at some point, exhaust the file system's open file handles, and when that happens, the file system will interrupt you with an exception and you won't be able to open more files.

By applying a concurrency limit, we can mitigate this resource exhaustion problem, like this:

var flow = require("asyncflow");
var fs = require("fs");

var readFile = flow.wrap(fs.readFile);
var fileLength = flow.wrap(function(file, i, a, cb) {
  flow(function() {
    cb(null, readFile(file).wait().length);
  });
});

flow(function() {
  var allDiskFiles = getAllDiskFiles();
  var totalSize = flow.wrapped.map(allDiskFiles, 100, fileLength).wait().
    reduce(function(x, y) { return x + y });
  // use totalSize here...
});

Notice that all we needed to do is to put a number in flow.wrapped.map to indicate an upper limit of concurrent calls.

ALERT: Don't write code like the above one in real world, this implementation is very resource inefficient.

Flow blocks can have an upper limit too, like this:

var flow = require("asyncflow");
var fs = require("fs");

var readFile = flow.wrap(fs.readFile);

flow(2, function() {
  var file1 = readFile("file_a.txt");
  var file2 = readFile("file_b.txt");
  var file3 = readFile("file_c.txt");
  // use file1, file2 and file3 here...
});

So within this flow block, only a max of 2 readFiles are running at any given time.

Exception handling

Exceptions within a flow block can be caught as usual.

var flow = require("asyncflow");
var fs = require("fs");

var readFile = flow.wrap(fs.readFile);

flow(function() {
  try {
    var content = readFile("non-existent").wait();
  } catch (e) {
    console.error(e);
  }
});

An exception not caught within a flow block is thrown back to the calling context, but because a flow block is an asynchronous call, these exceptions are actually uncaught exceptions, hence dangerous. You may try to use Node.js' built-in domain module to contain them.

For example, the following code won't work:

var flow = require("asyncflow");
var fs = require("fs");

var readFile = flow.wrap(fs.readFile);

// ATTENTION, THIS WILL NOT WORK.
try {
  flow(function() {
    var content = readFile("NON-EXISTENT").wait();
  });
} catch (e) {
  console.error(e);
}
// ATTENTION, ABOVE CODE WILL NOT WORK.

The rule of thumb is that exceptions should not be thrown in any async call. This applies to Node.js in general as well as to asyncflow.

Extending prototypes

So far, we have avoided extending built-in objects' prototype to achieve a more object oriented feel. But if you think that extending built-in objects' prototypes is not a bad idea for your project, you may try to take advantage of flow.extension members.

var flow = require("asyncflow");
var fs = require("fs");

Function.prototype.wrap = flow.extension.wrap;
Array.prototype.pmap = flow.extension.collection("map");

var readFile = fs.readFile.wrap();

flow(function() {
  var allDiskFiles = getAllDiskFiles();
  var totalSize = allDiskFiles.pmap(function(file, i, a, cb) {
    flow(function() {
      cb(null, readFile(file).wait().length);
    });
  }.wrap()).wait().reduce(function(x, y) { return x + y });
  // use totalSize here...
});

Refer to the API for more extension facilities.

Write your own async function

Remember that a flow block is asynchronous call, but everthing within it is synchronous (by using wrapped version of functions). So you can use a flow block to help you implement an asynchronous function in a synchronous way.

var flow = require("asyncflow");
var fs = require("fs");

var writeFile = flow.wrap(fs.writeFile);

function writeRecord(record, file, callback) {
  flow(function() {
    try {
      writeFile(file, record).wait();
      callback(null);
    } catch (e) {
      callback(e);
    }
  });
}

var someRecord = getSomeRecord();

writeRecord(someRecord, "some_file", function(err) {
  if (!err) console.log("Write OK");
});

And this writeRecord async function we just wrote, is also wrappable for use in another flow block.

var writeRecord$ = flow.wrap(writeRecord);

flow(function() {
  var someRecord = getSomeRecord();
  try {
    writeRecord$().wait();
  } catch (e) {
    console.error(e);
  }
});