diff --git a/lib/storage/file.js b/lib/storage/file.js index 78004c31916..18695cbb7ef 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -211,6 +211,19 @@ File.prototype.copy = function(destination, callback) { * piped to a writable stream or listened to for 'data' events to read a file's * contents. * + * In the unlikely event there is a mismatch between what you downloaded and the + * version in your Bucket, your error handler will receive an error with code + * "CONTENT_DOWNLOAD_MISMATCH". If you receive this error, the best recourse is + * to try downloading the file again. + * + * @param {object=} options - Configuration object. + * @param {string|boolean} options.validation - Possible values: `"md5"`, + * `"crc32c"`, or `false`. By default, data integrity is validated with an + * MD5 checksum for maximum reliability, falling back to CRC32c when an MD5 + * hash wasn't returned from the API. CRC32c will provide better performance + * with less reliability. You may also choose to skip validation completely, + * however this is **not recommended**. + * * @example * //- * //

Downloading a File

@@ -226,35 +239,133 @@ File.prototype.copy = function(destination, callback) { * .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png')) * .on('error', function(err) {}); */ -File.prototype.createReadStream = function() { - var storage = this.bucket.storage; - var dup = duplexify(); - function createAuthorizedReq(uri) { - var reqOpts = { uri: uri }; - storage.makeAuthorizedRequest_(reqOpts, { - onAuthorized: function(err, authorizedReqOpts) { - if (err) { - dup.emit('error', err); - dup.end(); - return; - } - dup.setReadable(request(authorizedReqOpts)); - } - }); +File.prototype.createReadStream = function(options) { + options = options || {}; + + var that = this; + var throughStream = through(); + + var validations = ['crc32c', 'md5']; + var validation; + + if (util.is(options.validation, 'string')) { + options.validation = options.validation.toLowerCase(); + + if (validations.indexOf(options.validation) > -1) { + validation = options.validation; + } else { + validation = 'all'; + } + } + + if (util.is(options.validation, 'undefined')) { + validation = 'all'; } + + var crc32c = validation === 'crc32c' || validation === 'all'; + var md5 = validation === 'md5' || validation === 'all'; + if (this.metadata.mediaLink) { createAuthorizedReq(this.metadata.mediaLink); } else { this.getMetadata(function(err, metadata) { if (err) { - dup.emit('error', err); - dup.end(); + throughStream.emit('error', err); + throughStream.end(); return; } + createAuthorizedReq(metadata.mediaLink); }); } - return dup; + + return throughStream; + + // Authenticate the request, then pipe the remote API request to the stream + // returned to the user. + function createAuthorizedReq(uri) { + var reqOpts = { + uri: uri + }; + + that.bucket.storage.makeAuthorizedRequest_(reqOpts, { + onAuthorized: function(err, authorizedReqOpts) { + if (err) { + throughStream.emit('error', err); + throughStream.end(); + return; + } + + // For data integrity, hash the contents of the stream as we receive it + // from the server. + var localCrc32cHash; + var localMd5Hash = crypto.createHash('md5'); + + request(authorizedReqOpts) + .on('error', function(err) { + throughStream.emit('error', err); + throughStream.end(); + }) + + .on('data', function(chunk) { + if (crc32c) { + localCrc32cHash = crc.calculate(chunk, localCrc32cHash); + } + + if (md5) { + localMd5Hash.update(chunk); + } + }) + + .on('complete', function(res) { + var failed = false; + var crcFail = true; + var md5Fail = true; + + var hashes = {}; + res.headers['x-goog-hash'].split(',').forEach(function(hash) { + var hashType = hash.split('=')[0]; + hashes[hashType] = hash.substr(hash.indexOf('=') + 1); + }); + + var remoteMd5 = hashes.md5; + var remoteCrc = hashes.crc32c && hashes.crc32c.substr(4); + + if (crc32c) { + crcFail = + new Buffer([localCrc32cHash]).toString('base64') !== remoteCrc; + failed = crcFail; + } + + if (md5) { + md5Fail = localMd5Hash.digest('base64') !== remoteMd5; + failed = md5Fail; + } + + if (validation === 'all') { + failed = remoteMd5 ? md5Fail : crcFail; + } + + if (failed) { + var error = new Error([ + 'The downloaded data did not match the data from the server.', + 'To be sure the content is the same, you should download the', + 'file again.' + ].join(' ')); + error.code = 'CONTENT_DOWNLOAD_MISMATCH'; + + throughStream.emit('error', error); + } else { + throughStream.emit('complete'); + } + + throughStream.end(); + }) + + .pipe(throughStream); + } + }); + } }; /** @@ -688,7 +799,7 @@ File.prototype.startResumableUpload_ = function(stream, metadata) { method: 'PUT', uri: resumableUri }, { - onAuthorized: function (err, reqOpts) { + onAuthorized: function(err, reqOpts) { if (err) { handleError(err); return; diff --git a/regression/storage.js b/regression/storage.js index c132ba5c3e1..43be0e8626c 100644 --- a/regression/storage.js +++ b/regression/storage.js @@ -322,12 +322,17 @@ describe('storage', function() { writeStream.on('error', done); writeStream.on('complete', function() { + var data = new Buffer(''); + file.createReadStream() + .on('error', done) .on('data', function(chunk) { - assert.equal(String(chunk), contents); + data = Buffer.concat([data, chunk]); }) - .on('error', done) - .on('end', done); + .on('complete', function() { + assert.equal(data.toString(), contents); + done(); + }); }); }); diff --git a/test/storage/file.js b/test/storage/file.js index e81c34d7d08..f6711964b7e 100644 --- a/test/storage/file.js +++ b/test/storage/file.js @@ -263,13 +263,15 @@ describe('File', function() { }); it('should create an authorized request', function(done) { - request_Override = function(opts) { + file.bucket.storage.makeAuthorizedRequest_ = function(opts) { assert.equal(opts.uri, metadata.mediaLink); done(); }; + file.getMetadata = function(callback) { callback(null, metadata); }; + file.createReadStream(); }); @@ -292,33 +294,129 @@ describe('File', function() { it('should get readable stream from request', function(done) { var fakeRequest = { a: 'b', c: 'd' }; - file.getMetadata = function(callback) { - callback(null, metadata); - }; + + // Faking a stream implementation so we can simulate an actual Request + // request. The only thing we want to know is if the data passed to + // request was correct. request_Override = function(req) { + if (!(this instanceof request_Override)) { + return new request_Override(req); + } + + stream.Readable.call(this); + this._read = util.noop; + assert.deepEqual(req, fakeRequest); done(); }; - file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - (callback.onAuthorized || callback)(null, fakeRequest); - }; - file.createReadStream(); - }); + nodeutil.inherits(request_Override, stream.Readable); - it('should set readable stream', function() { - var dup = duplexify(); file.getMetadata = function(callback) { callback(null, metadata); }; - request_Override = function() { - return dup; - }; + file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { - (callback.onAuthorized || callback)(); + (callback.onAuthorized || callback)(null, fakeRequest); }; + file.createReadStream(); - assert.deepEqual(readableStream, dup); - readableStream = null; + }); + + describe('validation', function() { + var data = 'test'; + + var crc32cBase64 = new Buffer([crc.calculate(data)]).toString('base64'); + + var md5HashBase64 = crypto.createHash('md5'); + md5HashBase64.update(data); + md5HashBase64 = md5HashBase64.digest('base64'); + + var fakeResponse = { + crc32c: { + headers: { 'x-goog-hash': 'crc32c=####' + crc32cBase64 } + }, + md5: { + headers: { 'x-goog-hash': 'md5=' + md5HashBase64 } + } + }; + + function getFakeRequest(data, fakeResponse) { + function FakeRequest(req) { + if (!(this instanceof FakeRequest)) { + return new FakeRequest(req); + } + + var that = this; + + stream.Readable.call(this); + this._read = function() { + this.push(data); + this.push(null); + }; + + setImmediate(function() { + that.emit('complete', fakeResponse); + }); + } + nodeutil.inherits(FakeRequest, stream.Readable); + return FakeRequest; + } + + beforeEach(function() { + file.metadata.mediaLink = 'http://uri'; + + file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) { + (callback.onAuthorized || callback)(null, {}); + }; + }); + + it('should validate with crc32c', function(done) { + request_Override = getFakeRequest(data, fakeResponse.crc32c); + + file.createReadStream({ validation: 'crc32c' }) + .on('error', done) + .on('complete', done); + }); + + it('should emit an error if crc32c validation fails', function(done) { + request_Override = getFakeRequest('bad-data', fakeResponse.crc32c); + + file.createReadStream({ validation: 'crc32c' }) + .on('error', function(err) { + assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH'); + done(); + }); + }); + + it('should validate with md5', function(done) { + request_Override = getFakeRequest(data, fakeResponse.md5); + + file.createReadStream({ validation: 'md5' }) + .on('error', done) + .on('complete', done); + }); + + it('should emit an error if md5 validation fails', function(done) { + request_Override = getFakeRequest('bad-data', fakeResponse.crc32c); + + file.createReadStream({ validation: 'md5' }) + .on('error', function(err) { + assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH'); + done(); + }); + }); + + it('should default to md5 validation', function(done) { + request_Override = getFakeRequest(data, { + headers: { 'x-goog-hash': 'md5=fakefakefake' } + }); + + file.createReadStream() + .on('error', function(err) { + assert.equal(err.code, 'CONTENT_DOWNLOAD_MISMATCH'); + done(); + }); + }); }); });