Skip to content

Commit

Permalink
storage: close sockets on error
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanseys authored and stephenplusplus committed Jun 2, 2015
1 parent b157b41 commit 670f845
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
52 changes: 33 additions & 19 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ File.prototype.createReadStream = function(options) {
util.is(options.start, 'number') || util.is(options.end, 'number');
var throughStream = streamEvents(through());

var requestStream;

var validations = ['crc32c', 'md5'];
var validation;

Expand Down Expand Up @@ -430,6 +432,22 @@ File.prototype.createReadStream = function(options) {
o: encodeURIComponent(this.name)
});

// End the stream, first emitting an error or complete event.
var endThroughStream = once(function(err, resp) {
if (err) {
throughStream.emit('error', err, resp);
} else {
throughStream.emit('complete', resp);
}

throughStream.destroy();
});

var endRequestStream = once(function() {
requestStream.abort();
requestStream.destroy();
});

createAuthorizedReq(remoteFilePath);

return throughStream;
Expand Down Expand Up @@ -458,7 +476,7 @@ File.prototype.createReadStream = function(options) {
that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
done(err, null);
endThroughStream(err, null);
return;
}

Expand All @@ -467,8 +485,13 @@ File.prototype.createReadStream = function(options) {
var localCrcHash;
var localMd5Hash = crypto.createHash('md5');

request(authorizedReqOpts)
.on('error', done)
requestStream = request(authorizedReqOpts);

requestStream
.on('error', function(err) {
endRequestStream();
endThroughStream(err);
})

.on('response', throughStream.emit.bind(throughStream, 'response'))

Expand All @@ -485,13 +508,13 @@ File.prototype.createReadStream = function(options) {
.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, resp) {
if (err) {
done(err, resp);
endThroughStream(err, resp);
return;
}

if (rangeRequest) {
// Range requests can't receive data integrity checks.
done(null, resp);
endThroughStream(null, resp);
return;
}

Expand Down Expand Up @@ -531,28 +554,19 @@ File.prototype.createReadStream = function(options) {
].join(' '));
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

done(mismatchError, resp);
endThroughStream(mismatchError, resp);
} else {
done(null, resp);
endThroughStream(null, resp);
}
});
})

.pipe(throughStream);
.pipe(throughStream)

.on('error', endRequestStream);
}
});
}

// End the stream, first emitting an error or complete event.
function done(err) {
if (err) {
throughStream.emit('error', err);
} else {
throughStream.emit('complete');
}

throughStream.end();
}
};

/**
Expand Down
3 changes: 3 additions & 0 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ describe('File', function() {
this.push(null);
};

this.abort = util.noop;
this.destroy = util.noop;

setImmediate(function() {
that.emit('response', fakeResponse);
that.emit('complete', fakeResponse);
Expand Down

0 comments on commit 670f845

Please sign in to comment.