From 670f8455febd0f6d90854aa1e47ef8fe108b0ce9 Mon Sep 17 00:00:00 2001 From: Ryan Seys Date: Tue, 2 Jun 2015 13:34:03 -0400 Subject: [PATCH] storage: close sockets on error --- lib/storage/file.js | 52 ++++++++++++++++++++++++++++---------------- test/storage/file.js | 3 +++ 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/lib/storage/file.js b/lib/storage/file.js index f166aa3a7a5..8099323750a 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -401,6 +401,8 @@ File.prototype.createReadStream = function(options) { util.is(options.start, 'number') || util.is(options.end, 'number'); var throughStream = streamEvents(through()); + var requestStream; + var validations = ['crc32c', 'md5']; var validation; @@ -430,6 +432,22 @@ File.prototype.createReadStream = function(options) { o: encodeURIComponent(this.name) }); + // End the stream, first emitting an error or complete event. + var endThroughStream = once(function(err, resp) { + if (err) { + throughStream.emit('error', err, resp); + } else { + throughStream.emit('complete', resp); + } + + throughStream.destroy(); + }); + + var endRequestStream = once(function() { + requestStream.abort(); + requestStream.destroy(); + }); + createAuthorizedReq(remoteFilePath); return throughStream; @@ -458,7 +476,7 @@ File.prototype.createReadStream = function(options) { that.bucket.storage.makeAuthorizedRequest_(reqOpts, { onAuthorized: function(err, authorizedReqOpts) { if (err) { - done(err, null); + endThroughStream(err, null); return; } @@ -467,8 +485,13 @@ File.prototype.createReadStream = function(options) { var localCrcHash; var localMd5Hash = crypto.createHash('md5'); - request(authorizedReqOpts) - .on('error', done) + requestStream = request(authorizedReqOpts); + + requestStream + .on('error', function(err) { + endRequestStream(); + endThroughStream(err); + }) .on('response', throughStream.emit.bind(throughStream, 'response')) @@ -485,13 +508,13 @@ File.prototype.createReadStream = function(options) { .on('complete', function(res) { util.handleResp(null, res, res.body, function(err, resp) { if (err) { - done(err, resp); + endThroughStream(err, resp); return; } if (rangeRequest) { // Range requests can't receive data integrity checks. - done(null, resp); + endThroughStream(null, resp); return; } @@ -531,28 +554,19 @@ File.prototype.createReadStream = function(options) { ].join(' ')); mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH'; - done(mismatchError, resp); + endThroughStream(mismatchError, resp); } else { - done(null, resp); + endThroughStream(null, resp); } }); }) - .pipe(throughStream); + .pipe(throughStream) + + .on('error', endRequestStream); } }); } - - // End the stream, first emitting an error or complete event. - function done(err) { - if (err) { - throughStream.emit('error', err); - } else { - throughStream.emit('complete'); - } - - throughStream.end(); - } }; /** diff --git a/test/storage/file.js b/test/storage/file.js index c4e82371ed0..9ca21aaad79 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -385,6 +385,9 @@ describe('File', function() { this.push(null); }; + this.abort = util.noop; + this.destroy = util.noop; + setImmediate(function() { that.emit('response', fakeResponse); that.emit('complete', fakeResponse);