Skip to content

Commit

Permalink
Fix: Use the sink stream utility to sink symlink stream (closes #195)
Browse files Browse the repository at this point in the history
  • Loading branch information
phated committed Nov 28, 2017
1 parent 1bdfca2 commit 72403ea
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 4 deletions.
11 changes: 7 additions & 4 deletions lib/symlink/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
'use strict';

var through2 = require('through2');
var fs = require('graceful-fs');
var path = require('path');

var fs = require('graceful-fs');
var through2 = require('through2');
var valueOrFunction = require('value-or-function');

var sink = require('../sink');
var prepareWrite = require('../prepare-write');
var defaultValue = require('../default-value');

Expand Down Expand Up @@ -44,8 +46,9 @@ function symlink(outFolder, opt) {
}

var stream = through2.obj(opt, linkFile);
// TODO: option for either backpressure or lossy
stream.resume();
// Sink the stream to start flowing
// Do this on nextTick, it will flow at slowest speed of piped streams
process.nextTick(sink(stream));
return stream;
}

Expand Down
161 changes: 161 additions & 0 deletions test/symlink.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ var cleanup = require('./utils/cleanup');
var statMode = require('./utils/stat-mode');
var isWindows = require('./utils/is-windows');
var applyUmask = require('./utils/apply-umask');
var testStreams = require('./utils/test-streams');
var isDirectory = require('./utils/is-directory-mock');
var testConstants = require('./utils/test-constants');

var from = miss.from;
var pipe = miss.pipe;
var concat = miss.concat;

var count = testStreams.count;
var slowCount = testStreams.slowCount;

function noop() {}

var outputRelative = testConstants.outputRelative;
var inputBase = testConstants.inputBase;
var outputBase = testConstants.outputBase;
Expand Down Expand Up @@ -372,6 +378,161 @@ describe('symlink stream', function() {
]);
});

it('does not get clogged by highWaterMark', function(done) {
var expectedCount = 17;
var highwatermarkFiles = [];
for (var idx = 0; idx < expectedCount; idx++) {
var file = new File({
base: inputBase,
path: inputPath,
contents: null,
});
highwatermarkFiles.push(file);
}

pipe([
from.obj(highwatermarkFiles),
count(expectedCount),
// Must be in the Writable position to test this
// So concat-stream cannot be used
vfs.symlink(outputBase),
], done);
});

it('allows backpressure when piped to another, slower stream', function(done) {
this.timeout(20000);

var expectedCount = 24;
var highwatermarkFiles = [];
for (var idx = 0; idx < expectedCount; idx++) {
var file = new File({
base: inputBase,
path: inputPath,
contents: null,
});
highwatermarkFiles.push(file);
}

pipe([
from.obj(highwatermarkFiles),
count(expectedCount),
vfs.symlink(outputBase),
slowCount(expectedCount),
], done);
});

it('respects readable listeners on symlink stream', function(done) {
var file = new File({
base: inputBase,
path: inputDirpath,
contents: null,
});

var symlinkStream = vfs.symlink(outputBase);

var readables = 0;
symlinkStream.on('readable', function() {
var data = symlinkStream.read();

if (data != null) {
readables++;
}
});

function assert(err) {
expect(readables).toEqual(1);
done(err);
}

pipe([
from.obj([file]),
symlinkStream,
], assert);
});

it('respects data listeners on symlink stream', function(done) {
var file = new File({
base: inputBase,
path: inputDirpath,
contents: null,
});

var symlinkStream = vfs.symlink(outputBase);

var datas = 0;
symlinkStream.on('data', function() {
datas++;
});

function assert(err) {
expect(datas).toEqual(1);
done(err);
}

pipe([
from.obj([file]),
symlinkStream,
], assert);
});

it('sinks the stream if all the readable event handlers are removed', function(done) {
var expectedCount = 17;
var highwatermarkFiles = [];
for (var idx = 0; idx < expectedCount; idx++) {
var file = new File({
base: inputBase,
path: inputPath,
contents: null,
});
highwatermarkFiles.push(file);
}

var symlinkStream = vfs.symlink(outputBase);

symlinkStream.on('readable', noop);

pipe([
from.obj(highwatermarkFiles),
count(expectedCount),
// Must be in the Writable position to test this
// So concat-stream cannot be used
symlinkStream,
], done);

process.nextTick(function() {
symlinkStream.removeListener('readable', noop);
});
});

it('sinks the stream if all the data event handlers are removed', function(done) {
var expectedCount = 17;
var highwatermarkFiles = [];
for (var idx = 0; idx < expectedCount; idx++) {
var file = new File({
base: inputBase,
path: inputPath,
contents: null,
});
highwatermarkFiles.push(file);
}

var symlinkStream = vfs.symlink(outputBase);

symlinkStream.on('data', noop);

pipe([
from.obj(highwatermarkFiles),
count(expectedCount),
// Must be in the Writable position to test this
// So concat-stream cannot be used
symlinkStream,
], done);

process.nextTick(function() {
symlinkStream.removeListener('data', noop);
});
});

// TODO: need a better way to pass these options through
// Or maybe not at all since we fixed highWaterMark
it('passes options to through2', function(done) {
Expand Down

0 comments on commit 72403ea

Please sign in to comment.