Skip to content

Commit

Permalink
Made adjustment to produce stream for new produce method
Browse files Browse the repository at this point in the history
  • Loading branch information
webmakersteve committed Oct 17, 2016
1 parent 5f73b33 commit 8228a83
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
23 changes: 16 additions & 7 deletions lib/util/topicWritable.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = TopicWritable;

var Writable = require('stream').Writable;
var util = require('util');
var ErrorCode = require('../error').codes;

util.inherits(TopicWritable, Writable);

Expand Down Expand Up @@ -126,17 +127,25 @@ TopicWritable.prototype._write = function(data, encoding, cb) {
self.topic = self.producer.Topic(self.topicName, self.topicOptions);
}

return this.producer.produce({
topic: self.topic,
message: data,
}, function streamSendMessageCallback(err) {
if (err) {
try {
this.producer.produce(self.topic, null, data, null);
setImmediate(cb);
} catch (e) {
if (ErrorCode.ERR__QUEUE_FULL === e.code) {
// Just delay this thing a bit and pass the params
// backpressure will get exerted this way.
setTimeout(function() {
self._write(data, encoding, cb);
}, 500);
} else {
if (self.autoClose) {
self.close();
}
setImmediate(function() {
cb(e);
});
}
return cb(err);
});
}
};

function writev(producer, topic, chunks, cb) {
Expand Down
1 change: 1 addition & 0 deletions src/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#define SRC_BINDING_H_

#include <nan.h>
#include <string>
#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
#include "src/common.h"
#include "src/errors.h"
Expand Down
8 changes: 4 additions & 4 deletions test/util/topicReadable.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ module.exports = {
stream.once('readable', function() {
var message = stream.read();
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.message));
t.equal('test', message.message.toString());
t.ok(Buffer.isBuffer(message.payload));
t.equal('test', message.payload.toString());
t.equal('testkey', message.key);
t.equal(typeof message.offset, 'number');
stream.pause();
Expand Down Expand Up @@ -92,7 +92,7 @@ module.exports = {
var message = stream.read();
numReceived++;
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.message));
t.ok(Buffer.isBuffer(message.payload));
t.equal(typeof message.offset, 'number');
if (numReceived === numMessages) {
// give it a second to get an error
Expand All @@ -106,7 +106,7 @@ module.exports = {
var writable = new Writable({
write: function(message, encoding, next) {
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.message));
t.ok(Buffer.isBuffer(message.payload));
t.equal(typeof message.offset, 'number');
this.cork();
cb();
Expand Down
12 changes: 6 additions & 6 deletions test/util/topicWritable.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ module.exports = {
t.equal(typeof cb, 'function',
'Provided callback should always be a function');
t.deepEqual({ topicName: 'topic' }, message.topic);
t.equal(message.message.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message.message), true);
t.equal(message.payload.toString(), 'Awesome');
t.equal(Buffer.isBuffer(message.payload), true);
done();
};

Expand All @@ -73,8 +73,8 @@ module.exports = {
t.equal(typeof cb, 'function',
'Provided callback should always be a function');
t.deepEqual({ topicName: 'topic' }, message.topic);
t.equal(message.message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message.message), true);
t.equal(message.payload.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message.payload), true);
if (currentMessage === 2) {
done();
} else {
Expand Down Expand Up @@ -114,8 +114,8 @@ module.exports = {
t.equal(typeof cb, 'function',
'Provided callback should always be a function');
t.deepEqual({ topicName: 'topic' }, message.topic);
t.equal(message.message.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message.message), true);
t.equal(message.payload.toString(), 'Awesome' + currentMessage);
t.equal(Buffer.isBuffer(message.payload), true);
if (currentMessage === 2) {
done();
} else {
Expand Down

0 comments on commit 8228a83

Please sign in to comment.