diff --git a/lib/storage/file.js b/lib/storage/file.js index d136da44a06..06b1b564020 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -405,6 +405,8 @@ File.prototype.createReadStream = function(options) { util.is(options.start, 'number') || util.is(options.end, 'number'); var throughStream = streamEvents(through()); + var requestStream; + var validations = ['crc32c', 'md5']; var validation; @@ -434,6 +436,22 @@ File.prototype.createReadStream = function(options) { o: encodeURIComponent(this.name) }); + // End the stream, first emitting an error or complete event. + var endThroughStream = once(function(err, resp) { + if (err) { + throughStream.emit('error', err, resp); + } else { + throughStream.emit('complete', resp); + } + + throughStream.destroy(); + }); + + var endRequestStream = once(function() { + requestStream.abort(); + requestStream.destroy(); + }); + createAuthorizedReq(remoteFilePath); return throughStream; @@ -462,7 +480,7 @@ File.prototype.createReadStream = function(options) { that.bucket.storage.makeAuthorizedRequest_(reqOpts, { onAuthorized: function(err, authorizedReqOpts) { if (err) { - done(err, null); + endThroughStream(err); return; } @@ -471,8 +489,13 @@ File.prototype.createReadStream = function(options) { var localCrcHash; var localMd5Hash = crypto.createHash('md5'); - request(authorizedReqOpts) - .on('error', done) + requestStream = request(authorizedReqOpts); + + requestStream + .on('error', function(err) { + endRequestStream(); + endThroughStream(err); + }) .on('response', throughStream.emit.bind(throughStream, 'response')) @@ -489,13 +512,13 @@ File.prototype.createReadStream = function(options) { .on('complete', function(res) { util.handleResp(null, res, res.body, function(err, resp) { if (err) { - done(err, resp); + endThroughStream(err, resp); return; } if (rangeRequest) { // Range requests can't receive data integrity checks. - done(null, resp); + endThroughStream(null, resp); return; } @@ -535,28 +558,19 @@ File.prototype.createReadStream = function(options) { ].join(' ')); mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; - done(mismatchError, resp); + endThroughStream(mismatchError, resp); } else { - done(null, resp); + endThroughStream(null, resp); } }); }) - .pipe(throughStream); + .pipe(throughStream) + + .on('error', endRequestStream); } }); } - - // End the stream, first emitting an error or complete event. - function done(err) { - if (err) { - throughStream.emit('error', err); - } else { - throughStream.emit('complete'); - } - - throughStream.end(); - } }; /** diff --git a/test/storage/file.js b/test/storage/file.js index a868905c050..df5deb8d311 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -51,9 +51,14 @@ function FakeDuplexify() { nodeutil.inherits(FakeDuplexify, stream.Duplex); var makeWritableStream_Override; +var handleResp_Override; var fakeUtil = extend({}, util, { + handleResp: function() { + (handleResp_Override || util.handleResp).apply(null, arguments); + }, + makeWritableStream: function() { - var args = util.toArray(arguments); + var args = arguments; (makeWritableStream_Override || util.makeWritableStream).apply(null, args); } }); @@ -125,6 +130,7 @@ describe('File', function() { directoryFile = new File(bucket, 'directory/file.jpg'); directoryFile.makeReq_ = util.noop; + handleResp_Override = null; makeWritableStream_Override = null; request_Override = null; }); @@ -370,95 +376,262 @@ describe('File', function() { }); describe('createReadStream', function() { + function getFakeRequest(data) { + var aborted = false; + var destroyed = false; + var requestOptions; - function getFakeRequest(data, fakeResponse) { - function FakeRequest(req) { + function FakeRequest(_requestOptions) { if (!(this instanceof FakeRequest)) { - return new FakeRequest(req); + return new FakeRequest(_requestOptions); } - var that = this; + requestOptions = _requestOptions; stream.Readable.call(this); this._read = function() { - this.push(data); + if (data) { + this.push(data); + } this.push(null); }; - setImmediate(function() { - that.emit('response', fakeResponse); - that.emit('complete', fakeResponse); - }); + this.abort = function() { + aborted = true; + }; + + this.destroy = function() { + destroyed = true; + }; } nodeutil.inherits(FakeRequest, stream.Readable); + + FakeRequest.getRequestOptions = function() { return requestOptions; }; + FakeRequest.wasRequestAborted = function() { return aborted; }; + FakeRequest.wasRequestDestroyed = function() { return destroyed; }; + return FakeRequest; } - it('should create an authorized request', function(done) { - var expectedPath = util.format('https://storage.googleapis.com/{b}/{o}', { - b: file.bucket.name, - o: encodeURIComponent(file.name) - }); + function getFakeSuccessfulRequest(data, fakeResponse) { + var FakeRequest = getFakeRequest(data); + + function FakeSuccessfulRequest(req) { + if (!(this instanceof FakeSuccessfulRequest)) { + return new FakeSuccessfulRequest(req); + } + + FakeRequest.apply(this, arguments); + + var self = this; + + setImmediate(function() { + self.emit('response', fakeResponse); + self.emit('complete', fakeResponse); + }); + } + nodeutil.inherits(FakeSuccessfulRequest, FakeRequest); + extend(FakeSuccessfulRequest, FakeRequest); + + return FakeSuccessfulRequest; + } - file.bucket.storage.makeAuthorizedRequest_ = function(opts) { - assert.equal(opts.uri, expectedPath); + function getFakeFailedRequest(error) { + var FakeRequest = getFakeRequest(); + + function FakeFailedRequest() { + if (!(this instanceof FakeFailedRequest)) { + return new FakeFailedRequest(); + } + + FakeRequest.apply(this, arguments); + + var self = this; + + setImmediate(function() { + self.emit('error', error); + }); + } + nodeutil.inherits(FakeFailedRequest, FakeRequest); + extend(FakeFailedRequest, FakeRequest); + + return FakeFailedRequest; + } + + it('should send query.generation if File has one', function(done) { + var versionedFile = new File(bucket, 'file.txt', { generation: 1 }); + + versionedFile.bucket.storage.makeAuthorizedRequest_ = function(reqOpts) { + assert.equal(reqOpts.qs.generation, 1); done(); }; - file.createReadStream(); + versionedFile.createReadStream(); }); - it('should emit an error from authorizing', function(done) { - var error = new Error('Error.'); - file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - setImmediate(function() { - (callback.onAuthorized || callback)(error); + it('should end request stream on error', function(done) { + request_Override = getFakeSuccessfulRequest('body', { body: null }); + + var readStream = file.createReadStream(); + + readStream.once('error', function() { + assert(request_Override.wasRequestAborted()); + assert(request_Override.wasRequestDestroyed()); + done(); + }); + + readStream.emit('error'); + }); + + describe('authorizing', function() { + it('should create an authorized request', function(done) { + var expectedPath = util.format('https://{host}/{b}/{o}', { + host: 'storage.googleapis.com', + b: file.bucket.name, + o: encodeURIComponent(file.name) }); - }; - file.createReadStream() - .once('error', function(err) { - assert.equal(err, error); + + file.bucket.storage.makeAuthorizedRequest_ = function(opts) { + assert.equal(opts.uri, expectedPath); done(); + }; + + file.createReadStream(); + }); + + describe('errors', function() { + var ERROR = new Error('Error.'); + + beforeEach(function() { + file.bucket.storage.makeAuthorizedRequest_ = function(opt, callback) { + setImmediate(function() { + (callback.onAuthorized || callback)(ERROR); + }); + }; }); + + it('should emit an error from authorizing', function(done) { + file.createReadStream() + .once('error', function(err) { + assert.equal(err, ERROR); + done(); + }); + }); + + it('should destroy the through stream', function(done) { + var readStream = file.createReadStream(); + readStream.once('error', util.noop); + readStream.destroy = done; + }); + }); }); - it('should emit response event from request', function(done) { - var response = { - headers: { 'x-goog-hash': 'md5=fakefakefake' } - }; - request_Override = getFakeRequest('body', response); + describe('request', function() { + it('should get readable stream from request', function(done) { + var fakeRequest = { a: 'b', c: 'd' }; - file.createReadStream({ validation: false }) - .on('response', function(res) { - assert.deepEqual(response, res); + request_Override = getFakeRequest(); + + file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { + (callback.onAuthorized || callback)(null, fakeRequest); + assert.deepEqual(request_Override.getRequestOptions(), fakeRequest); done(); + }; + + file.createReadStream(); + }); + + it('should emit response event from request', function(done) { + var response = { + headers: { 'x-goog-hash': 'md5=fakefakefake' } + }; + request_Override = getFakeSuccessfulRequest('body', response); + + file.createReadStream({ validation: false }) + .on('response', function(res) { + assert.deepEqual(response, res); + done(); + }); + }); + + describe('errors', function() { + var ERROR = new Error('Error.'); + + beforeEach(function() { + request_Override = getFakeFailedRequest(ERROR); }); + + it('should end the request stream', function(done) { + var readStream = file.createReadStream(); + + readStream.once('error', function() { + assert(request_Override.wasRequestAborted()); + assert(request_Override.wasRequestDestroyed()); + done(); + }); + }); + + it('should emit the error', function(done) { + file.createReadStream() + .once('error', function(err) { + assert.deepEqual(err, ERROR); + done(); + }); + }); + + it('should destroy the through stream', function(done) { + var readStream = file.createReadStream(); + readStream.destroy = done; + }); + }); }); - it('should get readable stream from request', function(done) { - var fakeRequest = { a: 'b', c: 'd' }; + describe('response', function() { + it('should use util.handleResp', function(done) { + var response = { + body: { + a: 'b', + c: 'd', + } + }; - // Faking a stream implementation so we can simulate an actual Request - // request. The only thing we want to know is if the data passed to - // request was correct. - request_Override = function(req) { - if (!(this instanceof request_Override)) { - return new request_Override(req); - } + request_Override = getFakeSuccessfulRequest('body', response); - stream.Readable.call(this); - this._read = util.noop; + handleResp_Override = function(err, resp, body) { + assert.strictEqual(err, null); + assert.deepEqual(resp, response); + assert.deepEqual(body, response.body); + done(); + }; - assert.deepEqual(req, fakeRequest); - done(); - }; - nodeutil.inherits(request_Override, stream.Readable); + file.createReadStream(); + }); - file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - (callback.onAuthorized || callback)(null, fakeRequest); - }; + describe('errors', function() { + var ERROR = new Error('Error.'); + + beforeEach(function() { + request_Override = getFakeSuccessfulRequest('body', { body: null }); + + handleResp_Override = function(err, resp, body, callback) { + callback(ERROR); + }; + }); + + it('should emit the error', function(done) { + var readStream = file.createReadStream(); + + readStream.once('error', function(err) { + assert.deepEqual(err, ERROR); + done(); + }); + }); - file.createReadStream(); + it('should destroy the through stream', function(done) { + var readStream = file.createReadStream(); + readStream.destroy = done; + }); + }); }); describe('validation', function() { @@ -488,7 +661,7 @@ describe('File', function() { }); it('should validate with crc32c', function(done) { - request_Override = getFakeRequest(data, fakeResponse.crc32c); + request_Override = getFakeSuccessfulRequest(data, fakeResponse.crc32c); file.createReadStream({ validation: 'crc32c' }) .on('error', done) @@ -498,7 +671,8 @@ describe('File', function() { }); it('should emit an error if crc32c validation fails', function(done) { - request_Override = getFakeRequest('bad-data', fakeResponse.crc32c); + request_Override = getFakeSuccessfulRequest( + 'bad-data', fakeResponse.crc32c); file.createReadStream({ validation: 'crc32c' }) .on('error', function(err) { @@ -508,7 +682,7 @@ describe('File', function() { }); it('should validate with md5', function(done) { - request_Override = getFakeRequest(data, fakeResponse.md5); + request_Override = getFakeSuccessfulRequest(data, fakeResponse.md5); file.createReadStream({ validation: 'md5' }) .on('error', done) @@ -518,7 +692,8 @@ describe('File', function() { }); it('should emit an error if md5 validation fails', function(done) { - request_Override = getFakeRequest('bad-data', fakeResponse.crc32c); + request_Override = getFakeSuccessfulRequest( + 'bad-data', fakeResponse.crc32c); file.createReadStream({ validation: 'md5' }) .on('error', function(err) { @@ -528,7 +703,7 @@ describe('File', function() { }); it('should default to md5 validation', function(done) { - request_Override = getFakeRequest(data, { + request_Override = getFakeSuccessfulRequest(data, { headers: { 'x-goog-hash': 'md5=fakefakefake' } }); @@ -538,77 +713,93 @@ describe('File', function() { done(); }); }); - }); - it('should send query.generation if File has one', function(done) { - var versionedFile = new File(bucket, 'file.txt', { generation: 1 }); + describe('destroying the through stream', function() { + it('should destroy after failed validation', function(done) { + request_Override = getFakeSuccessfulRequest( + 'bad-data', fakeResponse.crc32c); - versionedFile.bucket.storage.makeAuthorizedRequest_ = function(reqOpts) { - assert.equal(reqOpts.qs.generation, 1); - done(); - }; + var readStream = file.createReadStream({ validation: 'md5' }); + readStream.destroy = done; + }); - versionedFile.createReadStream(); + it('should destroy after successful validation', function(done) { + request_Override = getFakeSuccessfulRequest( + data, fakeResponse.crc32c); + + var readStream = file.createReadStream({ validation: 'crc32c' }); + readStream.destroy = done; + }); + }); }); - it('should accept a start range', function(done) { - var startOffset = 100; + describe('range requests', function() { + it('should accept a start range', function(done) { + var startOffset = 100; - request_Override = function(opts) { - setImmediate(function () { - assert.equal(opts.headers.Range, 'bytes=' + startOffset + '-'); - done(); - }); - return duplexify(); - }; + request_Override = function(opts) { + setImmediate(function () { + assert.equal(opts.headers.Range, 'bytes=' + startOffset + '-'); + done(); + }); + return duplexify(); + }; - file.createReadStream({ start: startOffset }); - }); + file.createReadStream({ start: startOffset }); + }); - it('should accept an end range and set start to 0', function(done) { - var endOffset = 100; + it('should accept an end range and set start to 0', function(done) { + var endOffset = 100; - request_Override = function(opts) { - setImmediate(function () { - assert.equal(opts.headers.Range, 'bytes=0-' + endOffset); - done(); - }); - return duplexify(); - }; + request_Override = function(opts) { + setImmediate(function () { + assert.equal(opts.headers.Range, 'bytes=0-' + endOffset); + done(); + }); + return duplexify(); + }; - file.createReadStream({ end: endOffset }); - }); + file.createReadStream({ end: endOffset }); + }); - it('should accept both a start and end range', function(done) { - var startOffset = 100; - var endOffset = 101; + it('should accept both a start and end range', function(done) { + var startOffset = 100; + var endOffset = 101; - request_Override = function(opts) { - setImmediate(function () { - var expectedRange = 'bytes=' + startOffset + '-' + endOffset; - assert.equal(opts.headers.Range, expectedRange); - done(); - }); - return duplexify(); - }; + request_Override = function(opts) { + setImmediate(function () { + var expectedRange = 'bytes=' + startOffset + '-' + endOffset; + assert.equal(opts.headers.Range, expectedRange); + done(); + }); + return duplexify(); + }; - file.createReadStream({ start: startOffset, end: endOffset }); - }); + file.createReadStream({ start: startOffset, end: endOffset }); + }); - it('should accept range start and end as 0', function(done) { - var startOffset = 0; - var endOffset = 0; + it('should accept range start and end as 0', function(done) { + var startOffset = 0; + var endOffset = 0; - request_Override = function(opts) { - setImmediate(function () { - var expectedRange = 'bytes=0-0'; - assert.equal(opts.headers.Range, expectedRange); - done(); - }); - return duplexify(); - }; + request_Override = function(opts) { + setImmediate(function () { + var expectedRange = 'bytes=0-0'; + assert.equal(opts.headers.Range, expectedRange); + done(); + }); + return duplexify(); + }; - file.createReadStream({ start: startOffset, end: endOffset }); + file.createReadStream({ start: startOffset, end: endOffset }); + }); + + it('should destroy the through stream', function(done) { + request_Override = getFakeSuccessfulRequest('body', { body: null }); + + var readStream = file.createReadStream({ start: 100 }); + readStream.destroy = done; + }); }); });