Skip to content

Commit

Permalink
Rename to , added producer key, and allow null payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
webmakersteve committed Oct 18, 2016
1 parent 3d7f1bb commit 3a34a67
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 29 deletions.
2 changes: 1 addition & 1 deletion e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('Consumer/Producer', function() {
t.ifError(err);
t.equal(Array.isArray(consumer.assignments()), true, 'Assignments should be an array');
t.equal(consumer.assignments().length > 0, true, 'Should have at least one assignment');
t.equal(buffer.toString(), message.payload.toString(),
t.equal(buffer.toString(), message.value.toString(),
'message is not equal to buffer');
done();
});
Expand Down
18 changes: 17 additions & 1 deletion e2e/producer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,26 @@ describe('Producer', function() {
});
});

it('should produce a message with a null payload', function(done) {
var tt = setInterval(function() {
producer.poll();
}, 200).unref();

producer.on('delivery-report', function(report) {
clearInterval(tt);
t.ok(report !== undefined);
t.ok(typeof report.topic_name === 'string');
t.ok(typeof report.partition === 'number');
t.ok(typeof report.offset === 'number');
done();
});

producer.produce('test', null, null, null);
});

it('should get 100% deliverability', function(done) {
this.timeout(3000);


var total = 0;
var max = 10000;
var errors = 0;
Expand Down
4 changes: 1 addition & 3 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ std::string GetParameter<std::string>(v8::Local<v8::Object> object,

return parameterString;

} else {
Log("Value is undefined or null");
}
}
}
Expand Down Expand Up @@ -332,7 +330,7 @@ v8::Local<v8::Object> ToV8Object(RdKafka::Message *message) {
Nan::MaybeLocal<v8::Object> buff = Nan::NewBuffer(
static_cast<char*>(payload), static_cast<int>(message->len()));

Nan::Set(pack, Nan::New<v8::String>("payload").ToLocalChecked(),
Nan::Set(pack, Nan::New<v8::String>("value").ToLocalChecked(),
buff.ToLocalChecked());
Nan::Set(pack, Nan::New<v8::String>("size").ToLocalChecked(),
Nan::New<v8::Number>(message->len()));
Expand Down
56 changes: 38 additions & 18 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ NAN_METHOD(Producer::NodeProduce) {
Nan::HandleScope scope;

// Need to extract the message data here.
if (info.Length() < 4 || !info[0]->IsObject() || !info[2]->IsObject()) {
if (info.Length() < 3 || !info[0]->IsObject()) {
// Just throw an exception
return Nan::ThrowError("Need to specify message data and topic");
return Nan::ThrowError("Need to specify a topic, partition, and message");
}

// First parameter is a topic
Expand All @@ -289,32 +289,52 @@ NAN_METHOD(Producer::NodeProduce) {
partition = RdKafka::Topic::PARTITION_UA;
}

if (!node::Buffer::HasInstance(info[2])) {
return Nan::ThrowError("Need to specify message data and topic");
}
size_t message_buffer_length;
void* message_buffer_data;

v8::Local<v8::Object> message_buffer_object = info[2]->ToObject();
if (info[2]->IsNull()) {
// This is okay for whatever reason
message_buffer_length = 0;
message_buffer_data = NULL;
} else if (!node::Buffer::HasInstance(info[2])) {
return Nan::ThrowError("Message must be a buffer or null");
} else {
v8::Local<v8::Object> message_buffer_object = info[2]->ToObject();

// v8 handles the garbage collection here so we need to make a copy of
// the buffer or assign the buffer to a persistent handle.
// v8 handles the garbage collection here so we need to make a copy of
// the buffer or assign the buffer to a persistent handle.

// I'm not sure which would be the more performant option. I assume
// the persistent handle would be but for now we'll try this one
// which should be more memory-efficient and allow v8 to dispose of the
// buffer sooner
// I'm not sure which would be the more performant option. I assume
// the persistent handle would be but for now we'll try this one
// which should be more memory-efficient and allow v8 to dispose of the
// buffer sooner

size_t message_buffer_length = node::Buffer::Length(message_buffer_object);
void* message_buffer_data = node::Buffer::Data(message_buffer_object);
message_buffer_length = node::Buffer::Length(message_buffer_object);
message_buffer_data = node::Buffer::Data(message_buffer_object);
}

Producer* producer = ObjectWrap::Unwrap<Producer>(info.This());
// Last we have to get the key
std::string * key;

// Testing crap
if (info[3]->IsNull() || info[3]->IsUndefined()) {
key = NULL;
} else {
v8::Local<v8::String> val = info[3]->ToString();
// Get string pointer for this thing
Nan::Utf8String keyUTF8(val);
std::string keyString(*keyUTF8);

// This will just go out of scope and we don't send it anywhere,
// since it is copied there is no need to delete it
key = &keyString;
}

Producer* producer = ObjectWrap::Unwrap<Producer>(info.This());

Baton b = producer->Produce(message_buffer_data, message_buffer_length,
topic->toRDKafkaTopic(), partition, NULL);
topic->toRDKafkaTopic(), partition, key);

// Let the JS library throw if we need to so the error can be more rich

int error_code = static_cast<int>(b.err());

info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
Expand Down
12 changes: 6 additions & 6 deletions test/util/topicReadable.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module.exports = {
'Provided callback should always be a function');
setImmediate(function() {
cb(null, {
payload: new Buffer('test'),
value: new Buffer('test'),
key: 'testkey',
offset: 1
});
Expand Down Expand Up @@ -58,8 +58,8 @@ module.exports = {
stream.once('readable', function() {
var message = stream.read();
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.payload));
t.equal('test', message.payload.toString());
t.ok(Buffer.isBuffer(message.value));
t.equal('test', message.value.toString());
t.equal('testkey', message.key);
t.equal(typeof message.offset, 'number');
stream.pause();
Expand All @@ -77,7 +77,7 @@ module.exports = {
numSent++;
setImmediate(function() {
cb(null, {
payload: new Buffer('test'),
value: new Buffer('test'),
offset: 1
});
});
Expand All @@ -92,7 +92,7 @@ module.exports = {
var message = stream.read();
numReceived++;
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.payload));
t.ok(Buffer.isBuffer(message.value));
t.equal(typeof message.offset, 'number');
if (numReceived === numMessages) {
// give it a second to get an error
Expand All @@ -106,7 +106,7 @@ module.exports = {
var writable = new Writable({
write: function(message, encoding, next) {
t.notEqual(message, null);
t.ok(Buffer.isBuffer(message.payload));
t.ok(Buffer.isBuffer(message.value));
t.equal(typeof message.offset, 'number');
this.cork();
cb();
Expand Down

0 comments on commit 3a34a67

Please sign in to comment.