From e5a72feb73a0622807be59664e8fe04e6b5b8ed4 Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Tue, 11 Nov 2014 16:59:55 -0500 Subject: [PATCH] storage: support resumable uploads --- lib/common/util.js | 135 ++++++++++++++--------------- lib/storage/file.js | 202 ++++++++++++++++++++++++++++++++++++++++---- package.json | 1 + 3 files changed, 256 insertions(+), 82 deletions(-) diff --git a/lib/common/util.js b/lib/common/util.js index 819f12612589..686680cec6e8 100644 --- a/lib/common/util.js +++ b/lib/common/util.js @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -52,17 +52,17 @@ var USER_AGENT = 'gcloud-node/' + PKG.version; * * @example * // globalConfig = { - * // credentials: {...} + * // credentials: {...} * // } * Datastore.prototype.dataset = function(options) { - * // options = { - * // keyFilename: 'keyfile.json' - * // } - * return extendGlobalConfig(this.config, options); - * // returns: - * // { - * // keyFilename: 'keyfile.json' - * // } + * // options = { + * // keyFilename: 'keyfile.json' + * // } + * return extendGlobalConfig(this.config, options); + * // returns: + * // { + * // keyFilename: 'keyfile.json' + * // } * }; */ function extendGlobalConfig(globalConfig, overrides) { @@ -70,8 +70,8 @@ function extendGlobalConfig(globalConfig, overrides) { var hasGlobalConnection = options.credentials || options.keyFilename; var isOverridingConnection = overrides.credentials || overrides.keyFilename; if (hasGlobalConnection && isOverridingConnection) { - delete options.credentials; - delete options.keyFilename; + delete options.credentials; + delete options.keyFilename; } return extend(true, {}, options, overrides); } @@ -93,7 +93,7 @@ module.exports.extendGlobalConfig = extendGlobalConfig; */ function arrayize(input) { if (!Array.isArray(input)) { - return [input]; + return [input]; } return input; } @@ -109,14 +109,14 @@ module.exports.arrayize = arrayize; * * @example * format('This is a {language} ({abbr}) codebase.', { - * language: 'JavaScript', - * abbr: 'JS' + * language: 'JavaScript', + * abbr: 'JS' * }); * // 'This is a JavaScript (JS) codebase.' */ function format(template, args) { return template.replace(/{([^}]*)}/g, function(match, key) { - return args[key] || match; + return args[key] || match; }); } @@ -127,7 +127,7 @@ module.exports.format = format; * * @example * function doSomething(callback) { - * callback = callback || noop; + * callback = callback || noop; * } */ function noop() {} @@ -147,6 +147,7 @@ function ApiError(errorBody) { this.errors = errorBody.errors; this.code = errorBody.code; this.message = errorBody.message; + this.response = errorBody.response; } util.inherits(ApiError, Error); @@ -162,27 +163,28 @@ util.inherits(ApiError, Error); function handleResp(err, resp, body, callback) { callback = callback || noop; if (err) { - callback(err); - return; + callback(err); + return; } if (typeof body === 'string') { - try { - body = JSON.parse(body); - } catch(err) {} + try { + body = JSON.parse(body); + } catch(err) {} } if (body && body.error) { - // Error from JSON api. - callback(new ApiError(body.error)); - return; + // Error from JSON api. + callback(new ApiError(body.error)); + return; } if (resp && (resp.statusCode < 200 || resp.statusCode > 299)) { - // Unknown error. Format according to ApiError standard. - callback(new ApiError({ - errors: [], - code: resp.statusCode, - message: body || 'Error during request.' - })); - return; + // Unknown error. Format according to ApiError standard. + callback(new ApiError({ + errors: [], + code: resp.statusCode, + message: body || 'Error during request.', + response: resp + })); + return; } callback(null, body, resp); } @@ -196,7 +198,7 @@ module.exports.handleResp = handleResp; */ function getType(value) { if (value instanceof Buffer) { - return 'buffer'; + return 'buffer'; } return Object.prototype.toString.call(value).match(/\s(\w+)\]/)[1]; } @@ -226,7 +228,7 @@ module.exports.is = is; * * @example * function aFunction() { - * return toArray(arguments); + * return toArray(arguments); * } * * aFunction(1, 2, 3); @@ -245,17 +247,17 @@ module.exports.toArray = toArray; * @param {Duplexify} dup - Duplexify stream. * @param {object} options - Configuration object. * @param {module:common/connection} options.connection - A connection instance, - * used to get a token with and send the request through. + * used to get a token with and send the request through. * @param {object} options.metadata - Metadata to send at the head of the - * request. + * request. * @param {object} options.request - Request object, in the format of a standard - * Node.js http.request() object. + * Node.js http.request() object. * @param {string=} options.request.method - Default: "POST". * @param {string=} options.request.qs.uploadType - Default: "multipart". * @param {string=} options.streamContentType - Default: - * "application/octet-stream". + * "application/octet-stream". * @param {function} onComplete - Callback, executed after the writable Request - * stream has completed. + * stream has completed. */ function makeWritableStream(dup, options, onComplete) { onComplete = onComplete || noop; @@ -269,7 +271,7 @@ function makeWritableStream(dup, options, onComplete) { var defaults = { method: 'POST', qs: { - uploadType: 'multipart' + uploadType: 'multipart' } }; @@ -339,42 +341,43 @@ function makeWritableStream(dup, options, onComplete) { }); } + module.exports.makeWritableStream = makeWritableStream; function makeAuthorizedRequest(config) { var authorize = gsa(config); function makeRequest(reqOpts, callback) { - var tokenRefreshAttempts = 0; - reqOpts.headers = reqOpts.headers || {}; - - if (reqOpts.headers['User-Agent']) { - reqOpts.headers['User-Agent'] += '; ' + USER_AGENT; - } else { - reqOpts.headers['User-Agent'] = USER_AGENT; - } - - function onAuthorizedRequest(err, authorizedReqOpts) { - if (err) { - if (err.code === 401 && - ++tokenRefreshAttempts <= MAX_TOKEN_REFRESH_ATTEMPTS) { - authorize(reqOpts, onAuthorizedRequest); - } else { - (callback.onAuthorized || callback)(err); - } - return; - } - - if (callback.onAuthorized) { - callback.onAuthorized(null, authorizedReqOpts); + var tokenRefreshAttempts = 0; + reqOpts.headers = reqOpts.headers || {}; + + if (reqOpts.headers['User-Agent']) { + reqOpts.headers['User-Agent'] += '; ' + USER_AGENT; + } else { + reqOpts.headers['User-Agent'] = USER_AGENT; + } + + function onAuthorizedRequest(err, authorizedReqOpts) { + if (err) { + if (err.code === 401 && + ++tokenRefreshAttempts <= MAX_TOKEN_REFRESH_ATTEMPTS) { + authorize(reqOpts, onAuthorizedRequest); } else { - request(authorizedReqOpts, function(err, res, body) { - handleResp(err, res, body, callback); - }); + (callback.onAuthorized || callback)(err); } + return; + } + + if (callback.onAuthorized) { + callback.onAuthorized(null, authorizedReqOpts); + } else { + request(authorizedReqOpts, function(err, res, body) { + handleResp(err, res, body, callback); + }); } + } - authorize(reqOpts, onAuthorizedRequest); + authorize(reqOpts, onAuthorizedRequest); } makeRequest.getCredentials = authorize.getCredentials; diff --git a/lib/storage/file.js b/lib/storage/file.js index 8d727ce9a037..2342851f9a49 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -20,10 +20,13 @@ 'use strict'; +var bufferEqual = require('buffer-equal'); +var ConfigStore = require('configstore'); var crypto = require('crypto'); var duplexify = require('duplexify'); var request = require('request'); var streamEvents = require('stream-events'); +var through = require('through2'); /** * @type module:common/util @@ -264,28 +267,195 @@ File.prototype.createReadStream = function() { */ File.prototype.createWriteStream = function(metadata) { var that = this; + + var bufferStream = through(); + var configStore = new ConfigStore('gcloud-node'); var dup = streamEvents(duplexify()); + var makeAuthorizedRequest = that.bucket.storage.makeAuthorizedRequest_; + var request = require('request'); + var resumableUri; + var retries = 0; + + var RETRY_LIMIT = 3; + + metadata = metadata || {}; dup.once('writing', function() { - util.makeWritableStream(dup, { - makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_, - metadata: metadata, - request: { - qs: { - name: that.name - }, - uri: util.format('{base}/{bucket}/o', { - base: STORAGE_UPLOAD_BASE_URL, - bucket: that.bucket.name - }) + var config = configStore.get(that.name); + + if (config) { + resumeUpload(config.uri); + } else { + startUpload(); + } + }); + + function startUpload() { + var headers = {}; + + if (metadata.contentType) { + headers['X-Upload-Content-Type'] = metadata.contentType; + } + + makeAuthorizedRequest({ + method: 'POST', + uri: util.format('{base}/{bucket}/o', { + base: STORAGE_UPLOAD_BASE_URL, + bucket: that.bucket.name + }), + qs: { + name: that.name, + uploadType: 'resumable' + }, + headers: headers, + json: metadata + }, function(err, res, body) { + if (err) { + dup.emit('error', err); + dup.end(); + return; } - }, function(data) { - that.metadata = data; - dup.emit('complete', data); - dup.end(); + resumableUri = body.headers.location; + configStore.set(that.name, { + uri: resumableUri + }); + resumeUpload(resumableUri, -1); }); - }); + } + + function resumeUpload(uri, lastByteWritten) { + if (util.is(lastByteWritten, 'number')) { + prepareUpload(lastByteWritten); + } else { + getLastByteWritten(uri, prepareUpload); + } + + function prepareUpload(lastByteWritten) { + makeAuthorizedRequest({ + method: 'PUT', + uri: uri + }, { + onAuthorized: function (err, reqOpts) { + if (err) { + if (err.code === 404) { + startUpload(); + return; + } + + if (err.code > 499 && err.code < 600 && retries <= RETRY_LIMIT) { + retries++; + prepareUpload(lastByteWritten); + return; + } + + dup.emit('error', err); + dup.end(); + return; + } + + sendFile(reqOpts, lastByteWritten); + } + }); + } + } + + function sendFile(reqOpts, lastByteWritten) { + var startByte = lastByteWritten + 1; + reqOpts.headers['Content-Range'] = 'bytes ' + startByte + '-*/*'; + + var bytesWritten = 0; + var limitStream = through(function(chunk, enc, next) { + // Determine if this is the same content uploaded previously. + if (bytesWritten === 0) { + var cachedFirstChunk = configStore.get(that.name).firstChunk; + var firstChunk = chunk.slice(0, 16); + + if (!cachedFirstChunk) { + configStore.set(that.name, { + uri: reqOpts.uri, + firstChunk: firstChunk + }); + } else { + cachedFirstChunk = new Buffer(cachedFirstChunk); + firstChunk = new Buffer(firstChunk); + + if (!bufferEqual(cachedFirstChunk, firstChunk)) { + // Different content. Start a new upload. + bufferStream.unshift(chunk); + bufferStream.unpipe(this); + configStore.del(that.name); + startUpload(); + return; + } + } + } + + var length = chunk.length; + + if (util.is(chunk, 'string')) { + length = Buffer.byteLength(chunk.length, enc); + } + + if (bytesWritten < lastByteWritten) { + chunk = chunk.slice(bytesWritten - length); + } + + bytesWritten += length; + + if (bytesWritten >= lastByteWritten) { + this.push(chunk); + } + + next(); + }); + + bufferStream.pipe(limitStream).pipe(getStream(reqOpts)); + dup.setWritable(bufferStream); + + function getStream(reqOpts) { + var stream = request(reqOpts); + stream.callback = util.noop; + + stream.on('complete', function(res) { + util.handleResp(null, res, res.body, function(err, data) { + if (err) { + dup.emit('error', err); + dup.end(); + return; + } + + that.metadata = data; + dup.emit('complete', that.metadata); + + configStore.del(that.name); + }); + }); + + return stream; + } + } + + // If an upload to this file has previously started, this will return the last + // byte written to it. + function getLastByteWritten(uri, callback) { + makeAuthorizedRequest({ + method: 'PUT', + uri: uri, + headers: { + 'Content-Length': 0, + 'Content-Range': 'bytes */*' + } + }, function(err) { + if (err && err.code === 308) { + // headers.range format: ##-## (e.g. 0-4915200) + callback(parseInt(err.response.headers.range.split('-')[1])); + return; + } + + callback(-1); + }); + } return dup; }; diff --git a/package.json b/package.json index bde62b4199ab..40d67596055c 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "google storage" ], "dependencies": { + "buffer-equal": "0.0.1", "duplexify": "^3.1.2", "extend": "^1.3.0", "google-service-account": "^1.0.0",