From 1feabcc8b8a4a5bddb3804c0b253e67a95992895 Mon Sep 17 00:00:00 2001 From: Blaine Bublitz Date: Thu, 7 Jul 2016 18:07:09 -0700 Subject: [PATCH] Fix: Use custom implementation of fs.WriteStream that supports flush (fixes #189) --- lib/dest/write-contents/write-stream.js | 69 ++++---- lib/file-operations.js | 101 ++++++++++++ package.json | 2 + test/dest.js | 8 +- test/file-operations.js | 201 ++++++++++++++++++++++++ 5 files changed, 339 insertions(+), 42 deletions(-) diff --git a/lib/dest/write-contents/write-stream.js b/lib/dest/write-contents/write-stream.js index 95c4c721..9dfc26e3 100644 --- a/lib/dest/write-contents/write-stream.js +++ b/lib/dest/write-contents/write-stream.js @@ -1,65 +1,58 @@ 'use strict'; -var fs = require('graceful-fs'); - var fo = require('../../file-operations'); var readStream = require('../../src/get-contents/read-stream'); function writeStream(file, onWritten) { var opt = { mode: file.stat.mode, + // TODO: need to test this (node calls this `flags` property) flag: file.flag, }; - var outStream = fs.createWriteStream(file.path, opt); - var fd = null; - - file.contents.once('error', complete); - file.contents.once('end', readStreamEnd); - outStream.once('error', complete); - outStream.once('finish', complete); - outStream.once('open', onOpen); + // TODO: is this the best API? + var outStream = fo.createWriteStream(file.path, opt, onFlush); - // Streams are piped with end disabled, this prevents the - // WriteStream from closing the file descriptor after all - // data is written. - file.contents.pipe(outStream, { end: false }); + file.contents.once('error', onComplete); + outStream.once('error', onComplete); + outStream.once('finish', onComplete); - // Obtain the file descriptor from the "open" event. - function onOpen(openFd) { - fd = openFd; - } + // TODO: should this use a clone? + file.contents.pipe(outStream); - function readStreamEnd() { - readStream(file, complete); - } + function onComplete(streamErr) { + // Cleanup event handlers before closing + file.contents.removeListener('error', onComplete); + outStream.removeListener('error', onComplete); + outStream.removeListener('finish', onComplete); - function end(propagatedErr) { - outStream.end(onEnd); + // Need to guarantee the fd is closed before forwarding the error + outStream.once('close', onClose); + outStream.end(); - function onEnd(endErr) { - onWritten(propagatedErr || endErr); + function onClose(closeErr) { + onWritten(streamErr || closeErr); } } // Cleanup - function complete(streamErr) { - file.contents.removeListener('error', complete); - file.contents.removeListener('end', readStreamEnd); - outStream.removeListener('error', complete); - outStream.removeListener('finish', complete); - outStream.removeListener('open', onOpen); + function onFlush(fd, callback) { + // TODO: removing this before readStream because it replaces the stream + file.contents.removeListener('error', onComplete); - if (streamErr) { - return end(streamErr); - } + // TODO: this is doing sync stuff & the callback seems unnecessary + // TODO: do we really want to replace the contents stream or should we use a clone + readStream(file, complete); - if (typeof fd !== 'number') { - return end(); - } + function complete() { + if (typeof fd !== 'number') { + return callback(); + } - fo.updateMetadata(fd, file, end); + fo.updateMetadata(fd, file, callback); + } } + } module.exports = writeStream; diff --git a/lib/file-operations.js b/lib/file-operations.js index b8fd9073..3a875b9b 100644 --- a/lib/file-operations.js +++ b/lib/file-operations.js @@ -1,10 +1,13 @@ 'use strict'; +var util = require('util'); + var fs = require('graceful-fs'); var path = require('path'); var assign = require('object-assign'); var isEqual = require('lodash.isequal'); var isValidDate = require('vali-date'); +var FlushWriteStream = require('flush-write-stream'); var constants = require('./constants'); @@ -324,6 +327,103 @@ function mkdirp(dirpath, customMode, callback) { } } +function createWriteStream(path, options, flush) { + return new WriteStream(path, options, flush); +} + +// Taken from node core and altered to receive a flush function and simplified +// To be used for cleanup (like updating times/mode/etc) +function WriteStream(path, options, flush) { + // Not exposed so we can avoid the case where someone doesn't use `new` + + if (typeof options === 'function') { + flush = options; + options = null; + } + + options = options || {}; + + FlushWriteStream.call(this, options, worker, cleanup); + + this.flush = flush; + this.path = path; + + this.mode = options.mode || constants.DEFAULT_FILE_MODE; + this.flag = options.flag || 'w'; + this.pos = APPEND_MODE_REGEXP.test(this.flag) ? null : 0;; + + // Used by node's `fs.WriteStream` + this.fd = null; + this.start = null; + + this.open(); + + // Dispose on finish. + this.once('finish', this.close); +} + +util.inherits(WriteStream, FlushWriteStream); + +WriteStream.prototype.open = function() { + var self = this; + + fs.open(this.path, this.flag, this.mode, onOpen); + + function onOpen(openErr, fd) { + if (openErr) { + self.destroy(); + self.emit('error', openErr); + return; + } + + self.fd = fd; + self.emit('open', fd); + } +}; + +// Use our `end` method since it is patched for flush +WriteStream.prototype.destroySoon = WriteStream.prototype.end; +// Use node's `fs.WriteStream` methods +WriteStream.prototype.destroy = fs.WriteStream.prototype.destroy; +WriteStream.prototype.close = fs.WriteStream.prototype.close; + +function worker(data, encoding, callback) { + var self = this; + + // This is from node core but I have no idea how to get code coverage on it + if (!Buffer.isBuffer(data)) { + return this.emit('error', new Error('Invalid data')); + } + + if (typeof this.fd !== 'number') { + return this.once('open', onOpen); + } + + fs.write(this.fd, data, 0, data.length, this.pos, onWrite); + + function onOpen() { + self._write(data, encoding, callback); + } + + function onWrite(writeErr) { + if (writeErr) { + self.destroy(); + callback(writeErr); + return; + } + + callback(); + } +} + +function cleanup(callback) { + if (typeof this.flush !== 'function') { + return callback(); + } + + this.flush(this.fd, callback); +} + module.exports = { closeFd: closeFd, isValidUnixId: isValidUnixId, @@ -334,4 +434,5 @@ module.exports = { updateMetadata: updateMetadata, writeFile: writeFile, mkdirp: mkdirp, + createWriteStream: createWriteStream, }; diff --git a/package.json b/package.json index 9429be28..66917339 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ ], "dependencies": { "duplexify": "^3.2.0", + "flush-write-stream": "^1.0.0", "glob-stream": "^5.3.2", "graceful-fs": "^4.0.0", "gulp-sourcemaps": "^1.5.2", @@ -36,6 +37,7 @@ "eslint": "^1.10.3", "eslint-config-gulp": "^2.0.0", "expect": "^1.14.0", + "from2": "^2.1.1", "github-changes": "^1.0.1", "istanbul": "^0.3.0", "istanbul-coveralls": "^1.0.1", diff --git a/test/dest.js b/test/dest.js index 3323b553..0705ca2a 100644 --- a/test/dest.js +++ b/test/dest.js @@ -18,6 +18,7 @@ var expect = require('expect'); var bufEqual = require('buffer-equal'); var through = require('through2'); var File = require('vinyl'); +var from = require('from2'); var should = require('should'); require('mocha'); @@ -1146,7 +1147,9 @@ describe('dest stream', function() { var inputPath = path.join(__dirname, './fixtures/test.coffee'); var inputBase = path.join(__dirname, './fixtures/'); - var contentStream = through.obj(); + var contentStream = from(function(size, cb) { + cb(new Error('mocked error')); + }); var expectedFile = new File({ base: inputBase, cwd: __dirname, @@ -1156,9 +1159,6 @@ describe('dest stream', function() { var stream = vfs.dest('./out-fixtures/', { cwd: __dirname }); stream.write(expectedFile); - setTimeout(function() { - contentStream.emit('error', new Error('mocked error')); - }, 100); stream.on('error', function(err) { expect(err).toExist(); done(); diff --git a/test/file-operations.js b/test/file-operations.js index c0fe35f2..10558b74 100644 --- a/test/file-operations.js +++ b/test/file-operations.js @@ -1350,3 +1350,204 @@ describe('mkdirp', function() { }); }); }); + +describe('createWriteStream', function() { + + // These tests needed to use the `close` event because + // the file descriptor was not closed on windows + + var inputPath = path.join(__dirname, './fixtures/test.coffee'); + var expectedContents = fs.readFileSync(inputPath, 'utf-8'); + var outputDir = path.join(__dirname, './out-fixtures/'); + var outputPath = path.join(outputDir, './test.coffee'); + + beforeEach(function(done) { + // For some reason, the outputDir sometimes exists on Windows + // So we use our mkdirp to create it + fo.mkdirp(outputDir, done); + }); + + afterEach(function() { + return del(outputDir); + }); + + it('accepts just a file path and writes to it', function(done) { + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath); + + outStream.on('close', function() { + var contents = fs.readFileSync(outputPath, 'utf-8'); + expect(contents).toEqual(expectedContents); + done(); + }); + + inStream.pipe(outStream); + }); + + it('accepts flag option', function(done) { + // Write 12 stars then 12345 because the length of expected is 12 + fs.writeFileSync(outputPath, '************12345'); + + var inStream = fs.createReadStream(inputPath); + // Replaces from the beginning of the file + var outStream = fo.createWriteStream(outputPath, { flag: 'r+' }); + + outStream.on('close', function() { + var contents = fs.readFileSync(outputPath, 'utf-8'); + expect(contents).toEqual(expectedContents + '12345'); + done(); + }); + + inStream.pipe(outStream); + }); + + it('accepts append flag as option & places cursor at the end', function(done) { + fs.writeFileSync(outputPath, '12345'); + + var inStream = fs.createReadStream(inputPath); + // Appends to the end of the file + var outStream = fo.createWriteStream(outputPath, { flag: 'a' }); + + outStream.on('close', function() { + var contents = fs.readFileSync(outputPath, 'utf-8'); + expect(contents).toEqual('12345' + expectedContents); + done(); + }); + + inStream.pipe(outStream); + }); + + it('accepts mode option', function(done) { + if (isWindows) { + console.log('Changing the mode of a file is not supported by node.js in Windows.'); + this.skip(); + return; + } + + var expectedMode = parseInt('777', 8) & ~process.umask(); + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, { mode: expectedMode }); + + outStream.on('finish', function() { + var stat = fs.statSync(outputPath); + expect(masked(stat.mode)).toEqual(expectedMode); + done(); + }); + + inStream.pipe(outStream); + }); + + it('uses default file mode if no mode options', function(done) { + var expectedMode = constants.DEFAULT_FILE_MODE & ~process.umask(); + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath); + + outStream.on('close', function() { + var stat = fs.statSync(outputPath); + expect(masked(stat.mode)).toEqual(expectedMode); + done(); + }); + + inStream.pipe(outStream); + }); + + it('accepts a flush function that is called before close emitted', function(done) { + var flushCalled = false; + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, {}, function(fd, cb) { + flushCalled = true; + cb(); + }); + + outStream.on('close', function() { + expect(flushCalled).toEqual(true); + done(); + }); + + inStream.pipe(outStream); + }); + + it('can specify flush without options argument', function(done) { + var flushCalled = false; + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, function(fd, cb) { + flushCalled = true; + cb(); + }); + + outStream.on('close', function() { + expect(flushCalled).toEqual(true); + done(); + }); + + inStream.pipe(outStream); + }); + + it('passes the file descriptor to flush', function(done) { + var flushCalled = false; + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, function(fd, cb) { + expect(fd).toBeA('number'); + flushCalled = true; + cb(); + }); + + outStream.on('close', function() { + expect(flushCalled).toEqual(true); + done(); + }); + + inStream.pipe(outStream); + }); + + it('passes a callback to flush to call when work is done', function(done) { + var flushCalled = false; + var timeoutCalled = false; + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, function(fd, cb) { + flushCalled = true; + setTimeout(function() { + timeoutCalled = true; + cb(); + }, 250); + }); + + outStream.on('close', function() { + expect(flushCalled).toEqual(true); + expect(timeoutCalled).toEqual(true); + done(); + }); + + inStream.pipe(outStream); + }); + + it('emits an error if open fails', function(done) { + var badOutputPath = path.join(outputDir, './non-exist/test.coffee'); + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(badOutputPath); + + // There is no file descriptor if open fails so mark done in the error + outStream.on('error', function(err) { + expect(err).toBeAn(Error); + done(); + }); + + inStream.pipe(outStream); + }); + + it('emits an error if write fails', function(done) { + // Create the file so it can be opened with `r` + fs.writeFileSync(outputPath, expectedContents); + + var inStream = fs.createReadStream(inputPath); + var outStream = fo.createWriteStream(outputPath, { flag: 'r' }); + + outStream.on('error', function(err) { + expect(err).toBeAn(Error); + }); + + outStream.on('close', done); + + inStream.pipe(outStream); + }); +});