From 05494e525661d369ba15b43afe16b0794803a50e Mon Sep 17 00:00:00 2001 From: Stephen Parente Date: Mon, 17 Oct 2016 16:22:53 -0700 Subject: [PATCH] Made adjustment to produce stream for new produce method --- lib/util/topicWritable.js | 21 ++++++++++++++------- test/util/topicReadable.spec.js | 8 ++++---- test/util/topicWritable.spec.js | 12 ++++++------ 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/lib/util/topicWritable.js b/lib/util/topicWritable.js index 20e52ec1..b80c1536 100644 --- a/lib/util/topicWritable.js +++ b/lib/util/topicWritable.js @@ -13,6 +13,7 @@ module.exports = TopicWritable; var Writable = require('stream').Writable; var util = require('util'); +var ErrorCode = require('../error').codes; util.inherits(TopicWritable, Writable); @@ -126,17 +127,23 @@ TopicWritable.prototype._write = function(data, encoding, cb) { self.topic = self.producer.Topic(self.topicName, self.topicOptions); } - return this.producer.produce({ - topic: self.topic, - message: data, - }, function streamSendMessageCallback(err) { - if (err) { + try { + this.producer.produce(self.topic, null, data, null); + cb(); + } catch (e) { + if (ErrorCode.ERR__QUEUE_FULL === e.code) { + // Just delay this thing a bit and pass the params + // backpressure will get exerted this way. + setTimeout(function() { + self._write(data, encoding, cb); + }, 500); + } else { if (self.autoClose) { self.close(); } + return cb(e); } - return cb(err); - }); + } }; function writev(producer, topic, chunks, cb) { diff --git a/test/util/topicReadable.spec.js b/test/util/topicReadable.spec.js index 977c7905..4bc72374 100644 --- a/test/util/topicReadable.spec.js +++ b/test/util/topicReadable.spec.js @@ -58,8 +58,8 @@ module.exports = { stream.once('readable', function() { var message = stream.read(); t.notEqual(message, null); - t.ok(Buffer.isBuffer(message.message)); - t.equal('test', message.message.toString()); + t.ok(Buffer.isBuffer(message.payload)); + t.equal('test', message.payload.toString()); t.equal('testkey', message.key); t.equal(typeof message.offset, 'number'); stream.pause(); @@ -92,7 +92,7 @@ module.exports = { var message = stream.read(); numReceived++; t.notEqual(message, null); - t.ok(Buffer.isBuffer(message.message)); + t.ok(Buffer.isBuffer(message.payload)); t.equal(typeof message.offset, 'number'); if (numReceived === numMessages) { // give it a second to get an error @@ -106,7 +106,7 @@ module.exports = { var writable = new Writable({ write: function(message, encoding, next) { t.notEqual(message, null); - t.ok(Buffer.isBuffer(message.message)); + t.ok(Buffer.isBuffer(message.payload)); t.equal(typeof message.offset, 'number'); this.cork(); cb(); diff --git a/test/util/topicWritable.spec.js b/test/util/topicWritable.spec.js index 83ff2f0d..b214483d 100644 --- a/test/util/topicWritable.spec.js +++ b/test/util/topicWritable.spec.js @@ -50,8 +50,8 @@ module.exports = { t.equal(typeof cb, 'function', 'Provided callback should always be a function'); t.deepEqual({ topicName: 'topic' }, message.topic); - t.equal(message.message.toString(), 'Awesome'); - t.equal(Buffer.isBuffer(message.message), true); + t.equal(message.payload.toString(), 'Awesome'); + t.equal(Buffer.isBuffer(message.payload), true); done(); }; @@ -73,8 +73,8 @@ module.exports = { t.equal(typeof cb, 'function', 'Provided callback should always be a function'); t.deepEqual({ topicName: 'topic' }, message.topic); - t.equal(message.message.toString(), 'Awesome' + currentMessage); - t.equal(Buffer.isBuffer(message.message), true); + t.equal(message.payload.toString(), 'Awesome' + currentMessage); + t.equal(Buffer.isBuffer(message.payload), true); if (currentMessage === 2) { done(); } else { @@ -114,8 +114,8 @@ module.exports = { t.equal(typeof cb, 'function', 'Provided callback should always be a function'); t.deepEqual({ topicName: 'topic' }, message.topic); - t.equal(message.message.toString(), 'Awesome' + currentMessage); - t.equal(Buffer.isBuffer(message.message), true); + t.equal(message.payload.toString(), 'Awesome' + currentMessage); + t.equal(Buffer.isBuffer(message.payload), true); if (currentMessage === 2) { done(); } else {