Skip to content

Commit

Permalink
storage: use util.makeWritableStream
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 6, 2014
1 parent 294c67e commit 8d67abd
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 260 deletions.
8 changes: 6 additions & 2 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ module.exports.toArray = toArray;
function makeWritableStream(dup, options, onComplete) {
onComplete = onComplete || noop;

options = options || {};
options.metadata = options.metadata || {};
onComplete = onComplete || noop;

var boundary = uuid.v4();

var defaults = {
Expand Down Expand Up @@ -276,7 +280,7 @@ function makeWritableStream(dup, options, onComplete) {
return;
}

var streamType = options.streamContentType || 'application/octet-stream';
var streamType = options.metadata.contentType || 'application/octet-stream';

var stream = options.connection.requester(req);
stream.callback = noop;
Expand Down Expand Up @@ -314,7 +318,7 @@ function makeWritableStream(dup, options, onComplete) {
// processing incoming data.
dup.setWritable(stream);

// Keep part of the stream open to keep Request from closing the conneciton.
// Keep part of the stream open to keep Request from closing the connection.
// Reference: http://goo.gl/zZVSif.
dup.pipe(stream);
});
Expand Down
107 changes: 22 additions & 85 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@

var crypto = require('crypto');
var duplexify = require('duplexify');
var extend = require('extend');
var uuid = require('node-uuid');
var streamEvents = require('stream-events');

/**
* @type module:common/util
Expand Down Expand Up @@ -259,25 +258,29 @@ File.prototype.createReadStream = function() {
*/
File.prototype.createWriteStream = function(metadata) {
var that = this;
var dup = duplexify();
this.getWritableStream_(metadata, function(err, writable) {
if (err) {
dup.emit('error', err);
return;
}
writable.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
dup.emit('error', err);
return;
}
that.metadata = data;
dup.emit('complete', data);
});
var dup = streamEvents(duplexify());

dup.once('writing', function() {
util.makeWritableStream(dup, {
connection: that.bucket.connection_,
metadata: metadata,
request: {
qs: {
name: that.name,
},
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
})
}
}, function(data) {
that.metadata = data;

dup.emit('complete', data);
dup.end();
});
dup.setWritable(writable);
dup.pipe(writable);
});

return dup;
};

Expand Down Expand Up @@ -411,70 +414,4 @@ File.prototype.setMetadata = function(metadata, callback) {
}.bind(this));
};

/*! Developer Documentation
*
* Private Methods
*
* These methods deal with creating and maintaining the lifecycle of a stream.
* All File objects are Duplex streams, which will allow a reader to pipe data
* to the remote endpoint. Likewise, you can pipe data from a remote endpoint to
* a writer.
*
* Duplexify is used to allow us to asynchronously set the readable and writable
* portions of this stream. We can't accept data for buffering until we have
* made an authorized connection. Once we have such a connection, we call
* `setReadable` and/or `setWritable` on the File instance (which is also a
* Duplexify instance), which then opens the pipe for more data to come in or go
* out.
*/

/**
* Get a remote stream to begin piping a readable stream to.
*
* @private
*/
File.prototype.getWritableStream_ = function(metadata, callback) {
if (!callback) {
callback = metadata;
metadata = {};
}
var that = this;
var boundary = uuid.v4();
metadata = extend({ contentType: 'text/plain' }, metadata);
this.bucket.connection_.createAuthorizedReq({
method: 'POST',
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
}),
qs: {
name: this.name,
uploadType: 'multipart'
},
headers: {
'Content-Type': 'multipart/related; boundary="' + boundary + '"'
}
}, function(err, req) {
if (err) {
callback(err);
return;
}
var remoteStream = that.bucket.connection_.requester(req);
remoteStream.callback = util.noop;
remoteStream.write('--' + boundary + '\n');
remoteStream.write('Content-Type: application/json\n\n');
remoteStream.write(JSON.stringify(metadata));
remoteStream.write('\n\n');
remoteStream.write('--' + boundary + '\n');
remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n');
var oldEndFn = remoteStream.end;
remoteStream.end = function(data, encoding, callback) {
data = (data || '') + '\n--' + boundary + '--\n';
remoteStream.write(data, encoding, callback);
oldEndFn.apply(this);
};
callback(null, remoteStream);
});
};

module.exports = File;
2 changes: 1 addition & 1 deletion test/bigquery/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function FakeFile(a, b) {
}

var makeWritableStream_Override;
var fakeUtil = extend(util, {
var fakeUtil = extend({}, util, {
makeWritableStream: function() {
var args = [].slice.call(arguments);
(makeWritableStream_Override || util.makeWritableStream).apply(null, args);
Expand Down
Loading

0 comments on commit 8d67abd

Please sign in to comment.