diff --git a/lib/symlink/index.js b/lib/symlink/index.js index b046b297..c5447528 100644 --- a/lib/symlink/index.js +++ b/lib/symlink/index.js @@ -1,10 +1,12 @@ 'use strict'; -var through2 = require('through2'); -var fs = require('graceful-fs'); var path = require('path'); + +var fs = require('graceful-fs'); +var through2 = require('through2'); var valueOrFunction = require('value-or-function'); +var sink = require('../sink'); var prepareWrite = require('../prepare-write'); var defaultValue = require('../default-value'); @@ -44,8 +46,9 @@ function symlink(outFolder, opt) { } var stream = through2.obj(opt, linkFile); - // TODO: option for either backpressure or lossy - stream.resume(); + // Sink the stream to start flowing + // Do this on nextTick, it will flow at slowest speed of piped streams + process.nextTick(sink(stream)); return stream; } diff --git a/test/symlink.js b/test/symlink.js index 315b097f..982bd71b 100644 --- a/test/symlink.js +++ b/test/symlink.js @@ -13,6 +13,7 @@ var cleanup = require('./utils/cleanup'); var statMode = require('./utils/stat-mode'); var isWindows = require('./utils/is-windows'); var applyUmask = require('./utils/apply-umask'); +var testStreams = require('./utils/test-streams'); var isDirectory = require('./utils/is-directory-mock'); var testConstants = require('./utils/test-constants'); @@ -20,6 +21,11 @@ var from = miss.from; var pipe = miss.pipe; var concat = miss.concat; +var count = testStreams.count; +var slowCount = testStreams.slowCount; + +function noop() {} + var outputRelative = testConstants.outputRelative; var inputBase = testConstants.inputBase; var outputBase = testConstants.outputBase; @@ -372,6 +378,161 @@ describe('symlink stream', function() { ]); }); + it('does not get clogged by highWaterMark', function(done) { + var expectedCount = 17; + var highwatermarkFiles = []; + for (var idx = 0; idx < expectedCount; idx++) { + var file = new File({ + base: inputBase, + path: inputPath, + contents: null, + }); + highwatermarkFiles.push(file); + } + + pipe([ + from.obj(highwatermarkFiles), + count(expectedCount), + // Must be in the Writable position to test this + // So concat-stream cannot be used + vfs.symlink(outputBase), + ], done); + }); + + it('allows backpressure when piped to another, slower stream', function(done) { + this.timeout(20000); + + var expectedCount = 24; + var highwatermarkFiles = []; + for (var idx = 0; idx < expectedCount; idx++) { + var file = new File({ + base: inputBase, + path: inputPath, + contents: null, + }); + highwatermarkFiles.push(file); + } + + pipe([ + from.obj(highwatermarkFiles), + count(expectedCount), + vfs.symlink(outputBase), + slowCount(expectedCount), + ], done); + }); + + it('respects readable listeners on symlink stream', function(done) { + var file = new File({ + base: inputBase, + path: inputDirpath, + contents: null, + }); + + var symlinkStream = vfs.symlink(outputBase); + + var readables = 0; + symlinkStream.on('readable', function() { + var data = symlinkStream.read(); + + if (data != null) { + readables++; + } + }); + + function assert(err) { + expect(readables).toEqual(1); + done(err); + } + + pipe([ + from.obj([file]), + symlinkStream, + ], assert); + }); + + it('respects data listeners on symlink stream', function(done) { + var file = new File({ + base: inputBase, + path: inputDirpath, + contents: null, + }); + + var symlinkStream = vfs.symlink(outputBase); + + var datas = 0; + symlinkStream.on('data', function() { + datas++; + }); + + function assert(err) { + expect(datas).toEqual(1); + done(err); + } + + pipe([ + from.obj([file]), + symlinkStream, + ], assert); + }); + + it('sinks the stream if all the readable event handlers are removed', function(done) { + var expectedCount = 17; + var highwatermarkFiles = []; + for (var idx = 0; idx < expectedCount; idx++) { + var file = new File({ + base: inputBase, + path: inputPath, + contents: null, + }); + highwatermarkFiles.push(file); + } + + var symlinkStream = vfs.symlink(outputBase); + + symlinkStream.on('readable', noop); + + pipe([ + from.obj(highwatermarkFiles), + count(expectedCount), + // Must be in the Writable position to test this + // So concat-stream cannot be used + symlinkStream, + ], done); + + process.nextTick(function() { + symlinkStream.removeListener('readable', noop); + }); + }); + + it('sinks the stream if all the data event handlers are removed', function(done) { + var expectedCount = 17; + var highwatermarkFiles = []; + for (var idx = 0; idx < expectedCount; idx++) { + var file = new File({ + base: inputBase, + path: inputPath, + contents: null, + }); + highwatermarkFiles.push(file); + } + + var symlinkStream = vfs.symlink(outputBase); + + symlinkStream.on('data', noop); + + pipe([ + from.obj(highwatermarkFiles), + count(expectedCount), + // Must be in the Writable position to test this + // So concat-stream cannot be used + symlinkStream, + ], done); + + process.nextTick(function() { + symlinkStream.removeListener('data', noop); + }); + }); + // TODO: need a better way to pass these options through // Or maybe not at all since we fixed highWaterMark it('passes options to through2', function(done) {