Skip to content

Commit

Permalink
storage: support resumable uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 11, 2014
1 parent 1cc2031 commit e5a72fe
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 82 deletions.
135 changes: 69 additions & 66 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -52,26 +52,26 @@ var USER_AGENT = 'gcloud-node/' + PKG.version;
*
* @example
* // globalConfig = {
* // credentials: {...}
* // credentials: {...}
* // }
* Datastore.prototype.dataset = function(options) {
* // options = {
* // keyFilename: 'keyfile.json'
* // }
* return extendGlobalConfig(this.config, options);
* // returns:
* // {
* // keyFilename: 'keyfile.json'
* // }
* // options = {
* // keyFilename: 'keyfile.json'
* // }
* return extendGlobalConfig(this.config, options);
* // returns:
* // {
* // keyFilename: 'keyfile.json'
* // }
* };
*/
function extendGlobalConfig(globalConfig, overrides) {
var options = extend({}, globalConfig);
var hasGlobalConnection = options.credentials || options.keyFilename;
var isOverridingConnection = overrides.credentials || overrides.keyFilename;
if (hasGlobalConnection && isOverridingConnection) {
delete options.credentials;
delete options.keyFilename;
delete options.credentials;
delete options.keyFilename;
}
return extend(true, {}, options, overrides);
}
Expand All @@ -93,7 +93,7 @@ module.exports.extendGlobalConfig = extendGlobalConfig;
*/
function arrayize(input) {
if (!Array.isArray(input)) {
return [input];
return [input];
}
return input;
}
Expand All @@ -109,14 +109,14 @@ module.exports.arrayize = arrayize;
*
* @example
* format('This is a {language} ({abbr}) codebase.', {
* language: 'JavaScript',
* abbr: 'JS'
* language: 'JavaScript',
* abbr: 'JS'
* });
* // 'This is a JavaScript (JS) codebase.'
*/
function format(template, args) {
return template.replace(/{([^}]*)}/g, function(match, key) {
return args[key] || match;
return args[key] || match;
});
}

Expand All @@ -127,7 +127,7 @@ module.exports.format = format;
*
* @example
* function doSomething(callback) {
* callback = callback || noop;
* callback = callback || noop;
* }
*/
function noop() {}
Expand All @@ -147,6 +147,7 @@ function ApiError(errorBody) {
this.errors = errorBody.errors;
this.code = errorBody.code;
this.message = errorBody.message;
this.response = errorBody.response;
}

util.inherits(ApiError, Error);
Expand All @@ -162,27 +163,28 @@ util.inherits(ApiError, Error);
function handleResp(err, resp, body, callback) {
callback = callback || noop;
if (err) {
callback(err);
return;
callback(err);
return;
}
if (typeof body === 'string') {
try {
body = JSON.parse(body);
} catch(err) {}
try {
body = JSON.parse(body);
} catch(err) {}
}
if (body && body.error) {
// Error from JSON api.
callback(new ApiError(body.error));
return;
// Error from JSON api.
callback(new ApiError(body.error));
return;
}
if (resp && (resp.statusCode < 200 || resp.statusCode > 299)) {
// Unknown error. Format according to ApiError standard.
callback(new ApiError({
errors: [],
code: resp.statusCode,
message: body || 'Error during request.'
}));
return;
// Unknown error. Format according to ApiError standard.
callback(new ApiError({
errors: [],
code: resp.statusCode,
message: body || 'Error during request.',
response: resp
}));
return;
}
callback(null, body, resp);
}
Expand All @@ -196,7 +198,7 @@ module.exports.handleResp = handleResp;
*/
function getType(value) {
if (value instanceof Buffer) {
return 'buffer';
return 'buffer';
}
return Object.prototype.toString.call(value).match(/\s(\w+)\]/)[1];
}
Expand Down Expand Up @@ -226,7 +228,7 @@ module.exports.is = is;
*
* @example
* function aFunction() {
* return toArray(arguments);
* return toArray(arguments);
* }
*
* aFunction(1, 2, 3);
Expand All @@ -245,17 +247,17 @@ module.exports.toArray = toArray;
* @param {Duplexify} dup - Duplexify stream.
* @param {object} options - Configuration object.
* @param {module:common/connection} options.connection - A connection instance,
* used to get a token with and send the request through.
* used to get a token with and send the request through.
* @param {object} options.metadata - Metadata to send at the head of the
* request.
* request.
* @param {object} options.request - Request object, in the format of a standard
* Node.js http.request() object.
* Node.js http.request() object.
* @param {string=} options.request.method - Default: "POST".
* @param {string=} options.request.qs.uploadType - Default: "multipart".
* @param {string=} options.streamContentType - Default:
* "application/octet-stream".
* "application/octet-stream".
* @param {function} onComplete - Callback, executed after the writable Request
* stream has completed.
* stream has completed.
*/
function makeWritableStream(dup, options, onComplete) {
onComplete = onComplete || noop;
Expand All @@ -269,7 +271,7 @@ function makeWritableStream(dup, options, onComplete) {
var defaults = {
method: 'POST',
qs: {
uploadType: 'multipart'
uploadType: 'multipart'
}
};

Expand Down Expand Up @@ -339,42 +341,43 @@ function makeWritableStream(dup, options, onComplete) {
});
}


module.exports.makeWritableStream = makeWritableStream;

function makeAuthorizedRequest(config) {
var authorize = gsa(config);

function makeRequest(reqOpts, callback) {
var tokenRefreshAttempts = 0;
reqOpts.headers = reqOpts.headers || {};

if (reqOpts.headers['User-Agent']) {
reqOpts.headers['User-Agent'] += '; ' + USER_AGENT;
} else {
reqOpts.headers['User-Agent'] = USER_AGENT;
}

function onAuthorizedRequest(err, authorizedReqOpts) {
if (err) {
if (err.code === 401 &&
++tokenRefreshAttempts <= MAX_TOKEN_REFRESH_ATTEMPTS) {
authorize(reqOpts, onAuthorizedRequest);
} else {
(callback.onAuthorized || callback)(err);
}
return;
}

if (callback.onAuthorized) {
callback.onAuthorized(null, authorizedReqOpts);
var tokenRefreshAttempts = 0;
reqOpts.headers = reqOpts.headers || {};

if (reqOpts.headers['User-Agent']) {
reqOpts.headers['User-Agent'] += '; ' + USER_AGENT;
} else {
reqOpts.headers['User-Agent'] = USER_AGENT;
}

function onAuthorizedRequest(err, authorizedReqOpts) {
if (err) {
if (err.code === 401 &&
++tokenRefreshAttempts <= MAX_TOKEN_REFRESH_ATTEMPTS) {
authorize(reqOpts, onAuthorizedRequest);
} else {
request(authorizedReqOpts, function(err, res, body) {
handleResp(err, res, body, callback);
});
(callback.onAuthorized || callback)(err);
}
return;
}

if (callback.onAuthorized) {
callback.onAuthorized(null, authorizedReqOpts);
} else {
request(authorizedReqOpts, function(err, res, body) {
handleResp(err, res, body, callback);
});
}
}

authorize(reqOpts, onAuthorizedRequest);
authorize(reqOpts, onAuthorizedRequest);
}

makeRequest.getCredentials = authorize.getCredentials;
Expand Down
Loading

0 comments on commit e5a72fe

Please sign in to comment.