Skip to content

Commit

Permalink
Merge pull request #636 from stephenplusplus/spp--storage-close-socke…
Browse files Browse the repository at this point in the history
…ts-on-error

storage: close sockets on error
  • Loading branch information
stephenplusplus committed Jun 5, 2015
2 parents 143ecf2 + aad528a commit 8acdc75
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 133 deletions.
52 changes: 33 additions & 19 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ File.prototype.createReadStream = function(options) {
util.is(options.start, 'number') || util.is(options.end, 'number');
var throughStream = streamEvents(through());

var requestStream;

var validations = ['crc32c', 'md5'];
var validation;

Expand Down Expand Up @@ -434,6 +436,22 @@ File.prototype.createReadStream = function(options) {
o: encodeURIComponent(this.name)
});

// End the stream, first emitting an error or complete event.
var endThroughStream = once(function(err, resp) {
if (err) {
throughStream.emit('error', err, resp);
} else {
throughStream.emit('complete', resp);
}

throughStream.destroy();
});

var endRequestStream = once(function() {
requestStream.abort();
requestStream.destroy();
});

createAuthorizedReq(remoteFilePath);

return throughStream;
Expand Down Expand Up @@ -462,7 +480,7 @@ File.prototype.createReadStream = function(options) {
that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
done(err, null);
endThroughStream(err);
return;
}

Expand All @@ -471,8 +489,13 @@ File.prototype.createReadStream = function(options) {
var localCrcHash;
var localMd5Hash = crypto.createHash('md5');

request(authorizedReqOpts)
.on('error', done)
requestStream = request(authorizedReqOpts);

requestStream
.on('error', function(err) {
endRequestStream();
endThroughStream(err);
})

.on('response', throughStream.emit.bind(throughStream, 'response'))

Expand All @@ -489,13 +512,13 @@ File.prototype.createReadStream = function(options) {
.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, resp) {
if (err) {
done(err, resp);
endThroughStream(err, resp);
return;
}

if (rangeRequest) {
// Range requests can't receive data integrity checks.
done(null, resp);
endThroughStream(null, resp);
return;
}

Expand Down Expand Up @@ -535,28 +558,19 @@ File.prototype.createReadStream = function(options) {
].join(' '));
mismatchError.code = 'CONTENT_DOWNLOAD_MISMATCH';

done(mismatchError, resp);
endThroughStream(mismatchError, resp);
} else {
done(null, resp);
endThroughStream(null, resp);
}
});
})

.pipe(throughStream);
.pipe(throughStream)

.on('error', endRequestStream);
}
});
}

// End the stream, first emitting an error or complete event.
function done(err) {
if (err) {
throughStream.emit('error', err);
} else {
throughStream.emit('complete');
}

throughStream.end();
}
};

/**
Expand Down
Loading

0 comments on commit 8acdc75

Please sign in to comment.