Skip to content

Commit

Permalink
simplify publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Feb 19, 2015
1 parent d2080f2 commit 6898811
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 68 deletions.
4 changes: 2 additions & 2 deletions lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

/*jshint strict:false, noarg:false */
/*jshint strict:false, noarg:false, eqnull:true */

/**
* @private
Expand Down Expand Up @@ -92,7 +92,7 @@ module.exports.extendGlobalConfig = extendGlobalConfig;
* // [ 'Hi' ]
*/
function arrayize(input) {
if (!input) {
if (input == null) {
return [];
}

Expand Down
16 changes: 4 additions & 12 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ Subscription.prototype.startPulling_ = function() {
Subscription.prototype.ack = function(ids, callback) {
if (!ids || ids.length === 0) {
throw new Error(
'At least one ID must be specified before it can be acknowledged');
'At least one ID must be specified before it can be acknowledged.');
}
ids = util.arrayize(ids);
var body = {
Expand Down Expand Up @@ -326,13 +326,7 @@ Subscription.prototype.delete = function(callback) {
* maxResults: 1
* };
*
* subscription.pull(opts, function(err, message) {
* // message = {
* // ackId: '', // ID used to acknowledge its receival.
* // id: '', // Unique message ID.
* // data: '' // Contents of the message.
* // }
* });
* subscription.pull(opts, function(err, messages) {});
*/
Subscription.prototype.pull = function(options, callback) {
var that = this;
Expand Down Expand Up @@ -363,18 +357,16 @@ Subscription.prototype.pull = function(options, callback) {
var messages = response.pullResponses || [response];
messages = messages.map(Subscription.formatMessage_);

var messageResponse = response.pullResponses ? messages : messages[0];

if (that.autoAck) {
var ackIds = messages.map(function(message) {
return message.ackId;
});

that.ack(ackIds, function(err) {
callback(err, messageResponse);
callback(err, messages);
});
} else {
callback(null, messageResponse);
callback(null, messages);
}
});
};
Expand Down
90 changes: 45 additions & 45 deletions regression/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ describe('pubsub', function() {

it('should publish a message', function(done) {
pubsub.topic(topicNames[0])
.publish('message from me', done);
.publish({ data: 'message from me' }, done);
});

it('should be deleted', function(done) {
Expand Down Expand Up @@ -176,15 +176,15 @@ describe('pubsub', function() {
it('should be able to pull and ack', function(done) {
var subscription = topic.subscription(subscriptions[0].name);

topic.publish('hello', function(err) {
topic.publish({ data: 'hello' }, function(err) {
assert.ifError(err);

subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
maxResults: 1
}, function(err, msgs) {
assert.ifError(err);
subscription.ack(msg.ackId, done);
subscription.ack(msgs[0].ackId, done);
});
});
});
Expand All @@ -193,26 +193,26 @@ describe('pubsub', function() {
var subscription = topic.subscription(subscriptions[0].name);

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') }
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' }
], function(err) {
assert.ifError(err);

subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
maxResults: 1
}, function(err, msgs) {
assert.ifError(err);
assert.equal(msg.data, 'hello');
subscription.ack(msg.ackId, done);
assert.equal(msgs[0].data, 'hello');
subscription.ack(msgs[0].ackId, done);
});
});
});
Expand All @@ -221,52 +221,52 @@ describe('pubsub', function() {
var subscription = topic.subscription(subscriptions[0].name);

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') }
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' }
], function(err) {
assert.ifError(err);

subscription.pull({
returnImmediately: true,
maxCount: 1
}, function(err, msg) {
maxResults: 1
}, function(err, msgs) {
assert.ifError(err);
assert.equal(msg.data, 'hello');
subscription.ack(msg.ackId, done);
assert.equal(msgs[0].data, 'hello');
subscription.ack(msgs[0].ackId, done);
});
});
});

it('should receive the chosen amount of results', function(done) {
var subscription = topic.subscription(subscriptions[0].name);
var opts = { returnImmediately: true, maxCount: 3 };
var opts = { returnImmediately: true, maxResults: 3 };

topic.publish([
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') },
{ data: new Buffer('hello').toString('base64') }
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' },
{ data: 'hello' }
], function(err) {
assert.ifError(err);

subscription.pull(opts, function(err, messages) {
assert.ifError(err);

assert.equal(messages.length, opts.maxCount);
assert.equal(messages.length, opts.maxResults);

var ackIds = messages.map(function(message) {
return message.ackId;
Expand Down
19 changes: 15 additions & 4 deletions test/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,21 @@ describe('common/util', function() {
});

describe('arrayize', function() {
it('should arrayize if the input is not an array', function(done) {
var o = util.arrayize('text');
assert.deepEqual(o, ['text']);
done();
it('should arrayize if the input is not an array', function() {
assert.deepEqual(util.arrayize('text'), ['text']);
});

it('should return the same array if given an array', function() {
var arr = [1, 2, 3];
assert.deepEqual(util.arrayize(arr), arr);
});

it('should return an empty array in correct circumstance', function() {
assert.deepEqual(util.arrayize(undefined), []);
assert.deepEqual(util.arrayize(null), []);

assert.deepEqual(util.arrayize(false), [false]);
assert.deepEqual(util.arrayize(0), [0]);
});
});

Expand Down
11 changes: 6 additions & 5 deletions test/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ describe('Subscription', function() {
callback(null, apiResponse);
};

subscription.pull(function(err, message) {
subscription.pull(function(err, msgs) {
assert.ifError(err);

assert.deepEqual(message, Subscription.formatMessage_(apiResponse));
assert.deepEqual(msgs, [Subscription.formatMessage_(apiResponse)]);

done();
});
Expand Down Expand Up @@ -310,7 +310,8 @@ describe('Subscription', function() {

assert.deepEqual(
messages,
apiResponse.pullResponses.map(Subscription.formatMessage_));
apiResponse.pullResponses.map(Subscription.formatMessage_)
);

done();
});
Expand Down Expand Up @@ -341,8 +342,8 @@ describe('Subscription', function() {
});

it('should execute callback with message', function(done) {
subscription.pull({}, function(err, msg) {
assert.deepEqual(msg, expectedMessage);
subscription.pull({}, function(err, msgs) {
assert.deepEqual(msgs, [expectedMessage]);
done();
});
});
Expand Down

0 comments on commit 6898811

Please sign in to comment.