Skip to content

Commit f709184

Browse files
Refactor rebalance callback to more pure implementation (#31)
Rebalance callback can now be set like any other configuration value. If it is omitted, we rely on the internal librdkafka rebalance callback. If you set it to true, it will use a node implementation (and expose the event to you). If you provide a function, you will do the assignment yourself, and it will emit the event.
1 parent 908a2fa commit f709184

18 files changed

+337
-259
lines changed

e2e/both.spec.js

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,14 @@ var testCase = new TestCase('Interoperability tests', function() {
7575

7676
crypto.randomBytes(4096, function(ex, buffer) {
7777

78-
consumer.on('rebalance', function(e) {
79-
if (e.code === 500) {
80-
setTimeout(function() {
81-
producer.produce({
82-
message: buffer,
83-
topic: topic
84-
}, function(err) {
85-
t.ifError(err);
86-
});
87-
}, 1000);
88-
}
89-
});
78+
var pT = setInterval(function() {
79+
producer.produce({
80+
message: buffer,
81+
topic: topic
82+
}, function(err) {
83+
t.ifError(err);
84+
});
85+
}, 2000);
9086

9187
var tt = setInterval(function() {
9288
if (!producer.isConnected()) {
@@ -116,6 +112,7 @@ var testCase = new TestCase('Interoperability tests', function() {
116112
}
117113

118114
clearInterval(tt);
115+
clearInterval(pT);
119116

120117
if (err) {
121118
return cb(err);

lib/kafka-consumer.js

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,34 @@ function KafkaConsumer(conf, topicConf) {
4040
return new KafkaConsumer(conf, topicConf);
4141
}
4242

43+
var onRebalance = conf.rebalance_cb;
44+
45+
var self = this;
46+
47+
// If rebalance is undefined we don't want any part of this
48+
if (onRebalance && typeof onRebalance === 'boolean') {
49+
conf.rebalance_cb = function(e) {
50+
// That's it
51+
if (e.code === 500 /*CODES.REBALANCE.PARTITION_ASSIGNMENT*/) {
52+
self.assign(e.assignment);
53+
} else if (e.code === 501 /*CODES.REBALANCE.PARTITION_UNASSIGNMENT*/) {
54+
self.unassign(e.assignment);
55+
}
56+
};
57+
} else if (onRebalance && typeof onRebalance === 'function') {
58+
/*
59+
* Once this is opted in to, that's it. It's going to manually rebalance
60+
* forever. There is no way to unset config values in librdkafka, just
61+
* a way to override them.
62+
*/
63+
64+
conf.rebalance_cb = function(e) {
65+
self.emit('rebalance', e);
66+
onRebalance.call(self, e);
67+
};
68+
}
69+
70+
4371
/**
4472
* KafkaConsumer message.
4573
*
@@ -57,20 +85,6 @@ function KafkaConsumer(conf, topicConf) {
5785

5886
Client.call(this, conf, Kafka.KafkaConsumer, topicConf);
5987

60-
var self = this;
61-
62-
/**
63-
* Rebalance event. Called when the KafkaConsumer is rebalancing.
64-
*
65-
* @event KafkaConsumer#rebalance
66-
* @type {object}
67-
* @property {number} code - whether the rebalance was an assignment or
68-
* an unassignment
69-
*/
70-
this._client.onRebalance(function(e) {
71-
self.emit('rebalance', e);
72-
});
73-
7488
this.globalConfig = conf;
7589
this.topicConfig = topicConf;
7690
}

src/callbacks.cc

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -425,31 +425,44 @@ void RebalanceDispatcher::Flush() {
425425
break;
426426
}
427427

428+
std::vector<rebalance_topic_partition_t> parts = _events[i].partitions;
429+
430+
v8::Local<v8::Array> tp_array = Nan::New<v8::Array>();
431+
432+
for (size_t i = 0; i < parts.size(); i++) {
433+
v8::Local<v8::Object> tp_obj = Nan::New<v8::Object>();
434+
rebalance_topic_partition_t tp = parts[i];
435+
436+
Nan::Set(tp_obj, Nan::New("topic").ToLocalChecked(),
437+
Nan::New<v8::String>(tp.topic.c_str()).ToLocalChecked());
438+
Nan::Set(tp_obj, Nan::New("partition").ToLocalChecked(),
439+
Nan::New<v8::Number>(tp.partition));
440+
441+
if (tp.offset >= 0) {
442+
Nan::Set(tp_obj, Nan::New("offset").ToLocalChecked(),
443+
Nan::New<v8::Number>(tp.offset));
444+
}
445+
446+
tp_array->Set(i, tp_obj);
447+
}
448+
// Now convert the TopicPartition list to a JS array
449+
Nan::Set(jsobj, Nan::New("assignment").ToLocalChecked(), tp_array);
450+
428451
argv[0] = jsobj;
429452

430453
Dispatch(argc, argv);
431454
}
432455
}
433456

434-
Rebalance::~Rebalance() {}
435-
Rebalance::Rebalance(NodeKafka::Consumer* that) :
436-
that_(that) {
437-
eof_cnt = 0;
457+
Rebalance::Rebalance(v8::Local<v8::Function> &cb) {
458+
dispatcher.AddCallback(cb);
438459
}
460+
Rebalance::~Rebalance() {}
439461

440462
void Rebalance::rebalance_cb(RdKafka::KafkaConsumer *consumer,
441-
RdKafka::ErrorCode err,
442-
std::vector<RdKafka::TopicPartition*> &partitions) {
443-
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
444-
that_->Assign(partitions);
445-
} else {
446-
that_->Unassign();
447-
}
448-
463+
RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
449464
dispatcher.Add(rebalance_event_t(err, partitions));
450465
dispatcher.Execute();
451-
452-
eof_cnt = 0;
453466
}
454467

455468
// Partitioner callback

src/callbacks.h

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,38 @@ class Delivery : public RdKafka::DeliveryReportCb {
140140

141141
// Rebalance dispatcher
142142

143+
struct rebalance_topic_partition_t {
144+
std::string topic;
145+
int partition;
146+
int64_t offset;
147+
148+
rebalance_topic_partition_t(std::string p_topic, int p_partition, int64_t p_offset): // NOLINT
149+
topic(p_topic),
150+
partition(p_partition),
151+
offset(p_offset) {}
152+
};
153+
143154
struct rebalance_event_t {
144155
RdKafka::ErrorCode err;
145-
std::vector<RdKafka::TopicPartition*> partitions;
146-
147-
rebalance_event_t(RdKafka::ErrorCode _err,
148-
std::vector<RdKafka::TopicPartition*> _partitions):
149-
err(_err),
150-
partitions(_partitions) {}
156+
std::vector<rebalance_topic_partition_t> partitions;
157+
158+
rebalance_event_t(RdKafka::ErrorCode p_err,
159+
std::vector<RdKafka::TopicPartition*> p_partitions):
160+
err(p_err) {
161+
// Iterate over the topic partitions because we won't have them later
162+
for (size_t topic_partition_i = 0;
163+
topic_partition_i < p_partitions.size(); topic_partition_i++) {
164+
RdKafka::TopicPartition* topic_partition =
165+
p_partitions[topic_partition_i];
166+
167+
rebalance_topic_partition_t tp(
168+
topic_partition->topic(),
169+
topic_partition->partition(),
170+
topic_partition->offset());
171+
172+
partitions.push_back(tp);
173+
}
174+
}
151175
};
152176

153177
class RebalanceDispatcher : public Dispatcher {
@@ -162,16 +186,15 @@ class RebalanceDispatcher : public Dispatcher {
162186

163187
class Rebalance : public RdKafka::RebalanceCb {
164188
public:
165-
explicit Rebalance(NodeKafka::Consumer* that);
189+
explicit Rebalance(v8::Local<v8::Function>&);
166190
~Rebalance();
167-
// NAN_DISALLOW_ASSIGN_COPY_MOVE?
168-
NodeKafka::Consumer* const that_;
169191

170192
void rebalance_cb(RdKafka::KafkaConsumer *, RdKafka::ErrorCode,
171193
std::vector<RdKafka::TopicPartition*> &);
194+
172195
RebalanceDispatcher dispatcher;
173196
private:
174-
int eof_cnt;
197+
v8::Persistent<v8::Function> m_cb;
175198
};
176199

177200
class Partitioner : public RdKafka::PartitionerCb {

src/common.cc

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,154 @@ std::vector<std::string> v8ArrayToStringVector(v8::Local<v8::Array> parameter) {
122122
return newItem;
123123
}
124124

125+
namespace TopicPartition {
126+
127+
/**
128+
* @brief RdKafka::TopicPartition vector to a v8 Array
129+
*
130+
* @see v8ArrayToTopicPartitionVector
131+
*/
132+
v8::Local<v8::Array> ToV8Array(
133+
std::vector<RdKafka::TopicPartition*> topic_partition_list) {
134+
v8::Local<v8::Array> array = Nan::New<v8::Array>();
135+
for (size_t topic_partition_i = 0;
136+
topic_partition_i < topic_partition_list.size(); topic_partition_i++) {
137+
RdKafka::TopicPartition* topic_partition =
138+
topic_partition_list[topic_partition_i];
139+
140+
// We have the list now let's get the properties from it
141+
v8::Local<v8::Object> obj = Nan::New<v8::Object>();
142+
143+
Nan::Set(obj, Nan::New("offset").ToLocalChecked(),
144+
Nan::New<v8::Number>(topic_partition->offset()));
145+
Nan::Set(obj, Nan::New("partition").ToLocalChecked(),
146+
Nan::New<v8::Number>(topic_partition->partition()));
147+
Nan::Set(obj, Nan::New("topic").ToLocalChecked(),
148+
Nan::New<v8::String>(topic_partition->topic().c_str()).ToLocalChecked());
149+
150+
array->Set(topic_partition_i, obj);
151+
}
152+
153+
return array;
154+
}
155+
156+
} // namespace TopicPartition
157+
158+
namespace Metadata {
159+
160+
/**
161+
* @brief RdKafka::Metadata to v8::Object
162+
*
163+
*/
164+
v8::Local<v8::Object> ToV8Object(RdKafka::Metadata* metadata) {
165+
v8::Local<v8::Object> obj = Nan::New<v8::Object>();
166+
167+
v8::Local<v8::Array> broker_data = Nan::New<v8::Array>();
168+
v8::Local<v8::Array> topic_data = Nan::New<v8::Array>();
169+
170+
const BrokerMetadataList* brokers = metadata->brokers(); // NOLINT
171+
172+
unsigned int broker_i = 0;
173+
174+
for (BrokerMetadataList::const_iterator it = brokers->begin();
175+
it != brokers->end(); ++it, broker_i++) {
176+
// Start iterating over brokers and set the object up
177+
178+
const RdKafka::BrokerMetadata* x = *it;
179+
180+
v8::Local<v8::Object> current_broker = Nan::New<v8::Object>();
181+
182+
Nan::Set(current_broker, Nan::New("id").ToLocalChecked(),
183+
Nan::New<v8::Number>(x->id()));
184+
Nan::Set(current_broker, Nan::New("host").ToLocalChecked(),
185+
Nan::New<v8::String>(x->host().c_str()).ToLocalChecked());
186+
Nan::Set(current_broker, Nan::New("port").ToLocalChecked(),
187+
Nan::New<v8::Number>(x->port()));
188+
189+
broker_data->Set(broker_i, current_broker);
190+
}
191+
192+
unsigned int topic_i = 0;
193+
194+
const TopicMetadataList* topics = metadata->topics();
195+
196+
for (TopicMetadataList::const_iterator it = topics->begin();
197+
it != topics->end(); ++it, topic_i++) {
198+
// Start iterating over topics
199+
200+
const RdKafka::TopicMetadata* x = *it;
201+
202+
v8::Local<v8::Object> current_topic = Nan::New<v8::Object>();
203+
204+
Nan::Set(current_topic, Nan::New("name").ToLocalChecked(),
205+
Nan::New<v8::String>(x->topic().c_str()).ToLocalChecked());
206+
207+
v8::Local<v8::Array> current_topic_partitions = Nan::New<v8::Array>();
208+
209+
const PartitionMetadataList* current_partition_data = x->partitions();
210+
211+
unsigned int partition_i = 0;
212+
PartitionMetadataList::const_iterator itt;
213+
214+
for (itt = current_partition_data->begin();
215+
itt != current_partition_data->end(); ++itt, partition_i++) {
216+
// partition iterate
217+
const RdKafka::PartitionMetadata* xx = *itt;
218+
219+
v8::Local<v8::Object> current_partition = Nan::New<v8::Object>();
220+
221+
Nan::Set(current_partition, Nan::New("id").ToLocalChecked(),
222+
Nan::New<v8::Number>(xx->id()));
223+
Nan::Set(current_partition, Nan::New("leader").ToLocalChecked(),
224+
Nan::New<v8::Number>(xx->leader()));
225+
226+
const std::vector<int32_t> * replicas = xx->replicas();
227+
const std::vector<int32_t> * isrs = xx->isrs();
228+
229+
std::vector<int32_t>::const_iterator r_it;
230+
std::vector<int32_t>::const_iterator i_it;
231+
232+
unsigned int r_i = 0;
233+
unsigned int i_i = 0;
234+
235+
v8::Local<v8::Array> current_replicas = Nan::New<v8::Array>();
236+
237+
for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) {
238+
current_replicas->Set(r_i, Nan::New<v8::Int32>(*r_it));
239+
}
240+
241+
v8::Local<v8::Array> current_isrs = Nan::New<v8::Array>();
242+
243+
for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) {
244+
current_isrs->Set(r_i, Nan::New<v8::Int32>(*i_it));
245+
}
246+
247+
Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(),
248+
current_replicas);
249+
Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(),
250+
current_isrs);
251+
252+
current_topic_partitions->Set(partition_i, current_partition);
253+
} // iterate over partitions
254+
255+
Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(),
256+
current_topic_partitions);
257+
258+
topic_data->Set(topic_i, current_topic);
259+
} // End iterating over topics
260+
261+
Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(),
262+
Nan::New<v8::Number>(metadata->orig_broker_id()));
263+
264+
Nan::Set(obj, Nan::New("orig_broker_name").ToLocalChecked(),
265+
Nan::New<v8::String>(metadata->orig_broker_name()).ToLocalChecked());
266+
267+
Nan::Set(obj, Nan::New("topics").ToLocalChecked(), topic_data);
268+
Nan::Set(obj, Nan::New("brokers").ToLocalChecked(), broker_data);
269+
270+
return obj;
271+
}
272+
273+
} // namespace Metadata
274+
125275
} // namespace NodeKafka

src/common.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
2020

21+
typedef std::vector<const RdKafka::BrokerMetadata*> BrokerMetadataList;
22+
typedef std::vector<const RdKafka::PartitionMetadata*> PartitionMetadataList;
23+
typedef std::vector<const RdKafka::TopicMetadata *> TopicMetadataList;
24+
2125
namespace NodeKafka {
2226

2327
void Log(std::string);
@@ -45,6 +49,18 @@ class scoped_mutex_lock {
4549
uv_mutex_t &async_lock;
4650
};
4751

52+
namespace TopicPartition {
53+
54+
v8::Local<v8::Array> ToV8Array(std::vector<RdKafka::TopicPartition*>);
55+
56+
}
57+
58+
namespace Metadata {
59+
60+
v8::Local<v8::Object> ToV8Object(RdKafka::Metadata*);
61+
62+
} // namespace Metadata
63+
4864
} // namespace NodeKafka
4965

5066
#endif // SRC_COMMON_H_

0 commit comments

Comments
 (0)