Skip to content

Commit

Permalink
Merge pull request #126 from gulpjs/read-next-file
Browse files Browse the repository at this point in the history
read from the stream once we finish processing a file, this avoids highWaterMark being hit - add test
  • Loading branch information
phated committed Dec 16, 2015
2 parents 0255086 + 1b8f715 commit 96a93df
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
10 changes: 10 additions & 0 deletions lib/dest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var through2 = require('through2');
var sourcemaps = require('gulp-sourcemaps');
var duplexify = require('duplexify');
var sink = require('../sink');
var prepareWrite = require('../prepareWrite');
var writeContents = require('./writeContents');

Expand All @@ -21,7 +22,12 @@ function dest(outFolder, opt) {
}

var saveStream = through2.obj(saveFile);

if (!opt.sourcemaps) {
// Sink the save stream to start flowing
// Do this on nextTick, it will flow at slowest speed of piped streams
process.nextTick(sink(saveStream));

return saveStream;
}

Expand All @@ -34,6 +40,10 @@ function dest(outFolder, opt) {
var outputStream = duplexify.obj(mapStream, saveStream);
mapStream.pipe(saveStream);

// Sink the output stream to start flowing
// Do this on nextTick, it will flow at slowest speed of piped streams
process.nextTick(sink(outputStream));

return outputStream;
}

Expand Down
16 changes: 16 additions & 0 deletions lib/sink.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

var Writable = require('readable-stream/writable');

function sink(stream) {
var sinkStream = new Writable({
objectMode: true,
write: function(file, enc, cb) { cb(); }
});

return function() {
stream.pipe(sinkStream);
};
}

module.exports = sink;
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"merge-stream": "^1.0.0",
"mkdirp": "^0.5.0",
"object-assign": "^4.0.0",
"readable-stream": "^2.0.4",
"strip-bom": "^2.0.0",
"strip-bom-stream": "^1.0.0",
"through2": "^2.0.0",
Expand All @@ -27,6 +28,7 @@
},
"devDependencies": {
"buffer-equal": "^0.0.1",
"del": "^2.2.0",
"istanbul": "^0.3.0",
"istanbul-coveralls": "^1.0.1",
"jshint": "^2.4.1",
Expand Down
76 changes: 73 additions & 3 deletions test/dest.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ var vfs = require('../');

var path = require('path');
var fs = require('graceful-fs');
var rimraf = require('rimraf');
var del = require('del');
var Writeable = require('readable-stream/writable');

var bufEqual = require('buffer-equal');
var through = require('through2');
Expand All @@ -17,11 +18,12 @@ var File = require('vinyl');
var should = require('should');
require('mocha');

var wipeOut = function(cb) {
rimraf(path.join(__dirname, './out-fixtures/'), cb);
var wipeOut = function() {
spies.setError('false');
statSpy.reset();
chmodSpy.reset();
del.sync(path.join(__dirname, './fixtures/highwatermark'));
del.sync(path.join(__dirname, './out-fixtures/'));
};

var dataWrap = function(fn) {
Expand Down Expand Up @@ -1256,4 +1258,72 @@ describe('dest stream', function() {
stream.write(file);
stream.end();
});

it('does not get clogged by highWaterMark', function(done) {
fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
for (var idx = 0; idx < 17; idx++) {
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
}

var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
var srcStream = vfs.src(srcPath);
var destStream = vfs.dest('./out-fixtures/', {cwd: __dirname});

var fileCount = 0;
var countFiles = through.obj(function(file, enc, cb) {
fileCount++;

cb(null, file);
});

destStream.once('finish', function() {
fileCount.should.equal(17);
done();
});

srcStream.pipe(countFiles).pipe(destStream);
});

it('allows backpressure when piped to another, slower stream', function(done) {
this.timeout(8000);

fs.mkdirSync(path.join(__dirname, './fixtures/highwatermark'));
for (var idx = 0; idx < 24; idx++) {
fs.writeFileSync(path.join(__dirname, './fixtures/highwatermark/', 'file' + idx + '.txt'));
}

var srcPath = path.join(__dirname, './fixtures/highwatermark/*.txt');
var srcStream = vfs.src(srcPath);
var destStream = vfs.dest('./out-fixtures/', {cwd: __dirname});

var fileCount = 0;
var countFiles = through.obj(function(file, enc, cb) {
fileCount++;

cb(null, file);
});

var slowFileCount = 0;
var slowCountFiles = new Writeable({
objectMode: true,
write: function(file, enc, cb){
slowFileCount++;

setTimeout(function() {
cb(null, file);
}, 250);
}
});

slowCountFiles.once('finish', function() {
fileCount.should.equal(24);
slowFileCount.should.equal(24);
done();
});

srcStream
.pipe(countFiles)
.pipe(destStream)
.pipe(slowCountFiles);
});
});

0 comments on commit 96a93df

Please sign in to comment.