Skip to content

Commit 5ce461e

Browse files
committed
Refactor config-based dispatchers to be controlled by config
1 parent 764a500 commit 5ce461e

File tree

11 files changed

+95
-66
lines changed

11 files changed

+95
-66
lines changed

lib/kafka-consumer.js

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,30 @@ function KafkaConsumer(conf, topicConf) {
4242

4343
var onRebalance = conf.rebalance_cb;
4444

45-
// Delete it because it messes other things up
46-
delete conf.rebalance_cb;
45+
var self = this;
46+
47+
// If rebalance is undefined we don't want any part of this
48+
if (onRebalance && typeof onRebalance === 'boolean') {
49+
conf.rebalance_cb = function(e) {
50+
// That's it
51+
if (e.code === 500 /*CODES.REBALANCE.PARTITION_ASSIGNMENT*/) {
52+
self.assign(e.assignment);
53+
} else if (e.code === 501 /*CODES.REBALANCE.PARTITION_UNASSIGNMENT*/) {
54+
self.unassign(e.assignment);
55+
}
56+
};
57+
} else if (onRebalance && typeof onRebalance === 'function') {
58+
/*
59+
* Once this is opted in to, that's it. It's going to manually rebalance
60+
* forever. There is no way to unset config values in librdkafka, just
61+
* a way to override them.
62+
*/
63+
64+
conf.rebalance_cb = function(e) {
65+
self.emit('rebalance', e);
66+
onRebalance.call(self, e);
67+
};
68+
}
4769

4870

4971
/**
@@ -63,42 +85,6 @@ function KafkaConsumer(conf, topicConf) {
6385

6486
Client.call(this, conf, Kafka.KafkaConsumer, topicConf);
6587

66-
var self = this;
67-
68-
if (onRebalance !== false) {
69-
// If rebalance is specifically false we don't want any part of this
70-
71-
if (typeof onRebalance !== 'function') {
72-
onRebalance = function(e) {
73-
// That's it
74-
self.assign(e.assignments);
75-
};
76-
}
77-
78-
}
79-
80-
/*
81-
* Once this is opted in to, that's it. It's going to manually rebalance
82-
* forever. There is no way to unset config values in librdkafka, just
83-
* a way to override them.
84-
*/
85-
86-
if (onRebalance) {
87-
/**
88-
* Rebalance event. Called when the KafkaConsumer is rebalancing.
89-
*
90-
* @event KafkaConsumer#rebalance
91-
* @type {object}
92-
* @property {number} code - whether the rebalance was an assignment or
93-
* an unassignment
94-
*/
95-
this._client.onRebalance(function(e) {
96-
self.emit('rebalance', e);
97-
onRebalance.call(self, e);
98-
});
99-
100-
}
101-
10288
this.globalConfig = conf;
10389
this.topicConfig = topicConf;
10490
}

src/callbacks.cc

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,13 @@ void Delivery::dr_cb(RdKafka::Message &message) {
386386

387387
// Rebalance CB
388388

389-
/*
389+
RebalanceDispatcher::RebalanceDispatcher() {}
390+
RebalanceDispatcher::~RebalanceDispatcher() {}
391+
392+
void RebalanceDispatcher::Add(const rebalance_event_t &e) {
393+
scoped_mutex_lock lock(async_lock);
394+
events.push_back(e);
395+
}
390396

391397
void RebalanceDispatcher::Flush() {
392398
Nan::HandleScope scope;
@@ -447,16 +453,18 @@ void RebalanceDispatcher::Flush() {
447453
Dispatch(argc, argv);
448454
}
449455
}
450-
*/
451-
Rebalance::Rebalance(Nan::Callback &cb) {}
456+
457+
Rebalance::Rebalance(v8::Local<v8::Function> &cb) {
458+
dispatcher.AddCallback(cb);
459+
}
452460
Rebalance::~Rebalance() {}
453461

454462
void Rebalance::rebalance_cb(RdKafka::KafkaConsumer *consumer,
455463
RdKafka::ErrorCode err,
456464
std::vector<RdKafka::TopicPartition*> &partitions) {
457465

458-
// dispatcher.Add(rebalance_event_t(err, partitions));
459-
// dispatcher.Execute();
466+
dispatcher.Add(rebalance_event_t(err, partitions));
467+
dispatcher.Execute();
460468

461469
}
462470

src/callbacks.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,25 @@ struct rebalance_event_t {
174174
}
175175
};
176176

177+
class RebalanceDispatcher : public Dispatcher {
178+
public:
179+
RebalanceDispatcher();
180+
~RebalanceDispatcher();
181+
void Add(const rebalance_event_t &);
182+
void Flush();
183+
protected:
184+
std::vector<rebalance_event_t> events;
185+
};
177186

178187
class Rebalance : public RdKafka::RebalanceCb {
179188
public:
180-
explicit Rebalance(Nan::Callback &);
189+
explicit Rebalance(v8::Local<v8::Function>&);
181190
~Rebalance();
182191

183192
void rebalance_cb(RdKafka::KafkaConsumer *, RdKafka::ErrorCode,
184193
std::vector<RdKafka::TopicPartition*> &);
194+
195+
RebalanceDispatcher dispatcher;
185196
private:
186197
v8::Persistent<v8::Function> m_cb;
187198
};

src/config.cc

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
6262
return NULL;
6363
}
6464
} else {
65-
Log("Value is a function");
6665
if (string_key.compare("rebalance_cb") == 0) {
67-
Nan::Callback cb(value.As<v8::Function>());
68-
NodeKafka::Callbacks::Rebalance rebalance_cb(cb);
69-
rdconf->set(string_key, &rebalance_cb, errstr);
66+
v8::Local<v8::Function> cb = value.As<v8::Function>();
67+
rdconf->m_rebalance_cb = new NodeKafka::Callbacks::Rebalance(cb);
68+
rdconf->set(string_key, rdconf->m_rebalance_cb, errstr);
7069
}
7170
}
7271
}
@@ -75,6 +74,22 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
7574

7675
}
7776

78-
Conf::~Conf() {}
77+
void Conf::listen() {
78+
if (m_rebalance_cb) {
79+
m_rebalance_cb->dispatcher.Activate();
80+
}
81+
}
82+
83+
void Conf::stop() {
84+
if (m_rebalance_cb) {
85+
m_rebalance_cb->dispatcher.Deactivate();
86+
}
87+
}
88+
89+
Conf::~Conf() {
90+
if (m_rebalance_cb) {
91+
delete m_rebalance_cb;
92+
}
93+
}
7994

8095
} // namespace NodeKafka

src/config.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ class Conf : public RdKafka::Conf {
2828
static Conf* create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT
2929

3030
static void DumpConfig(std::list<std::string> *);
31+
32+
void listen();
33+
void stop();
3134
protected:
32-
bool m_has_rebalance_cb;
33-
bool m_has_partitioner_cb;
35+
NodeKafka::Callbacks::Rebalance * m_rebalance_cb = NULL;
3436
};
3537

3638
} // namespace NodeKafka

src/connection.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ namespace NodeKafka {
3636
* @sa NodeKafka::Client
3737
*/
3838

39-
Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
39+
Connection::Connection(Conf* gconfig, Conf* tconfig):
4040
m_event_cb(),
4141
m_gconfig(gconfig),
4242
m_tconfig(tconfig) {

src/connection.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class Connection : public Nan::ObjectWrap {
6363
virtual void DeactivateDispatchers() = 0;
6464

6565
protected:
66-
Connection(RdKafka::Conf*, RdKafka::Conf*);
66+
Connection(Conf*, Conf*);
6767
~Connection();
6868

6969
static Nan::Persistent<v8::Function> constructor;
@@ -72,8 +72,8 @@ class Connection : public Nan::ObjectWrap {
7272
bool m_has_been_disconnected;
7373
bool m_is_closing;
7474

75-
RdKafka::Conf* m_gconfig;
76-
RdKafka::Conf* m_tconfig;
75+
Conf* m_gconfig;
76+
Conf* m_tconfig;
7777
std::string m_errstr;
7878

7979
uv_mutex_t m_connection_lock;

src/consumer.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,8 @@ consumer_commit_t::consumer_commit_t() {
3737
* @sa NodeKafka::Client
3838
*/
3939

40-
Consumer::Consumer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
40+
Consumer::Consumer(Conf* gconfig, Conf* tconfig):
4141
Connection(gconfig, tconfig) {
42-
m_is_subscribed = false;
43-
m_is_manually_rebalancing = false;
44-
4542
std::string errstr;
4643

4744
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
@@ -73,6 +70,13 @@ Baton Consumer::Connect() {
7370
}
7471

7572
void Consumer::ActivateDispatchers() {
73+
// Listen to global config
74+
m_gconfig->listen();
75+
76+
// Listen to non global config
77+
// tconfig->listen();
78+
79+
// This should be refactored to config based management
7680
m_event_cb.dispatcher.Activate();
7781
}
7882

@@ -100,6 +104,10 @@ Baton Consumer::Disconnect() {
100104
}
101105

102106
void Consumer::DeactivateDispatchers() {
107+
// Stop listening to the config dispatchers
108+
m_gconfig->stop();
109+
110+
// Also this one
103111
m_event_cb.dispatcher.Deactivate();
104112
}
105113

src/consumer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,15 @@ class Consumer : public Connection {
7979
static Nan::Persistent<v8::Function> constructor;
8080
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
8181

82-
Consumer(RdKafka::Conf *, RdKafka::Conf *);
82+
Consumer(Conf *, Conf *);
8383
~Consumer();
8484

8585
private:
8686
static void part_list_print(const std::vector<RdKafka::TopicPartition*>&);
8787

8888
std::vector<RdKafka::TopicPartition*> m_partitions;
8989
int m_partition_cnt;
90-
bool m_is_subscribed;
91-
bool m_is_manually_rebalancing;
90+
bool m_is_subscribed = false;
9291

9392
// Node methods
9493
static NAN_METHOD(NodeConnect);

src/producer.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace NodeKafka {
2828
* @sa NodeKafka::Connection
2929
*/
3030

31-
Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
31+
Producer::Producer(Conf* gconfig, Conf* tconfig):
3232
Connection(gconfig, tconfig),
3333
m_dr_cb(),
3434
m_partitioner_cb() {
@@ -109,14 +109,14 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
109109

110110
std::string errstr;
111111

112-
RdKafka::Conf* gconfig =
112+
Conf* gconfig =
113113
Conf::create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr);
114114

115115
if (!gconfig) {
116116
return Nan::ThrowError(errstr.c_str());
117117
}
118118

119-
RdKafka::Conf* tconfig =
119+
Conf* tconfig =
120120
Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr);
121121

122122
if (!tconfig) {

0 commit comments

Comments
 (0)