diff --git a/e2e/both.spec.js b/e2e/both.spec.js index bff6e3d5..f8b4da70 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -75,18 +75,14 @@ var testCase = new TestCase('Interoperability tests', function() { crypto.randomBytes(4096, function(ex, buffer) { - consumer.on('rebalance', function(e) { - if (e.code === 500) { - setTimeout(function() { - producer.produce({ - message: buffer, - topic: topic - }, function(err) { - t.ifError(err); - }); - }, 1000); - } - }); + var pT = setInterval(function() { + producer.produce({ + message: buffer, + topic: topic + }, function(err) { + t.ifError(err); + }); + }, 2000); var tt = setInterval(function() { if (!producer.isConnected()) { @@ -116,6 +112,7 @@ var testCase = new TestCase('Interoperability tests', function() { } clearInterval(tt); + clearInterval(pT); if (err) { return cb(err); diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index 04d03424..0714df5f 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -40,6 +40,34 @@ function KafkaConsumer(conf, topicConf) { return new KafkaConsumer(conf, topicConf); } + var onRebalance = conf.rebalance_cb; + + var self = this; + + // If rebalance is undefined we don't want any part of this + if (onRebalance && typeof onRebalance === 'boolean') { + conf.rebalance_cb = function(e) { + // That's it + if (e.code === 500 /*CODES.REBALANCE.PARTITION_ASSIGNMENT*/) { + self.assign(e.assignment); + } else if (e.code === 501 /*CODES.REBALANCE.PARTITION_UNASSIGNMENT*/) { + self.unassign(e.assignment); + } + }; + } else if (onRebalance && typeof onRebalance === 'function') { + /* + * Once this is opted in to, that's it. It's going to manually rebalance + * forever. There is no way to unset config values in librdkafka, just + * a way to override them. + */ + + conf.rebalance_cb = function(e) { + self.emit('rebalance', e); + onRebalance.call(self, e); + }; + } + + /** * KafkaConsumer message. * @@ -57,20 +85,6 @@ function KafkaConsumer(conf, topicConf) { Client.call(this, conf, Kafka.KafkaConsumer, topicConf); - var self = this; - - /** - * Rebalance event. Called when the KafkaConsumer is rebalancing. - * - * @event KafkaConsumer#rebalance - * @type {object} - * @property {number} code - whether the rebalance was an assignment or - * an unassignment - */ - this._client.onRebalance(function(e) { - self.emit('rebalance', e); - }); - this.globalConfig = conf; this.topicConfig = topicConf; } diff --git a/src/callbacks.cc b/src/callbacks.cc index 255e0a37..0ee7c231 100644 --- a/src/callbacks.cc +++ b/src/callbacks.cc @@ -425,31 +425,44 @@ void RebalanceDispatcher::Flush() { break; } + std::vector parts = _events[i].partitions; + + v8::Local tp_array = Nan::New(); + + for (size_t i = 0; i < parts.size(); i++) { + v8::Local tp_obj = Nan::New(); + rebalance_topic_partition_t tp = parts[i]; + + Nan::Set(tp_obj, Nan::New("topic").ToLocalChecked(), + Nan::New(tp.topic.c_str()).ToLocalChecked()); + Nan::Set(tp_obj, Nan::New("partition").ToLocalChecked(), + Nan::New(tp.partition)); + + if (tp.offset >= 0) { + Nan::Set(tp_obj, Nan::New("offset").ToLocalChecked(), + Nan::New(tp.offset)); + } + + tp_array->Set(i, tp_obj); + } + // Now convert the TopicPartition list to a JS array + Nan::Set(jsobj, Nan::New("assignment").ToLocalChecked(), tp_array); + argv[0] = jsobj; Dispatch(argc, argv); } } -Rebalance::~Rebalance() {} -Rebalance::Rebalance(NodeKafka::Consumer* that) : - that_(that) { - eof_cnt = 0; +Rebalance::Rebalance(v8::Local &cb) { + dispatcher.AddCallback(cb); } +Rebalance::~Rebalance() {} void Rebalance::rebalance_cb(RdKafka::KafkaConsumer *consumer, - RdKafka::ErrorCode err, - std::vector &partitions) { - if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { - that_->Assign(partitions); - } else { - that_->Unassign(); - } - + RdKafka::ErrorCode err, std::vector &partitions) { dispatcher.Add(rebalance_event_t(err, partitions)); dispatcher.Execute(); - - eof_cnt = 0; } // Partitioner callback diff --git a/src/callbacks.h b/src/callbacks.h index 2cb46b82..8f8f8feb 100644 --- a/src/callbacks.h +++ b/src/callbacks.h @@ -140,14 +140,38 @@ class Delivery : public RdKafka::DeliveryReportCb { // Rebalance dispatcher +struct rebalance_topic_partition_t { + std::string topic; + int partition; + int64_t offset; + + rebalance_topic_partition_t(std::string p_topic, int p_partition, int64_t p_offset): // NOLINT + topic(p_topic), + partition(p_partition), + offset(p_offset) {} +}; + struct rebalance_event_t { RdKafka::ErrorCode err; - std::vector partitions; - - rebalance_event_t(RdKafka::ErrorCode _err, - std::vector _partitions): - err(_err), - partitions(_partitions) {} + std::vector partitions; + + rebalance_event_t(RdKafka::ErrorCode p_err, + std::vector p_partitions): + err(p_err) { + // Iterate over the topic partitions because we won't have them later + for (size_t topic_partition_i = 0; + topic_partition_i < p_partitions.size(); topic_partition_i++) { + RdKafka::TopicPartition* topic_partition = + p_partitions[topic_partition_i]; + + rebalance_topic_partition_t tp( + topic_partition->topic(), + topic_partition->partition(), + topic_partition->offset()); + + partitions.push_back(tp); + } + } }; class RebalanceDispatcher : public Dispatcher { @@ -162,16 +186,15 @@ class RebalanceDispatcher : public Dispatcher { class Rebalance : public RdKafka::RebalanceCb { public: - explicit Rebalance(NodeKafka::Consumer* that); + explicit Rebalance(v8::Local&); ~Rebalance(); - // NAN_DISALLOW_ASSIGN_COPY_MOVE? - NodeKafka::Consumer* const that_; void rebalance_cb(RdKafka::KafkaConsumer *, RdKafka::ErrorCode, std::vector &); + RebalanceDispatcher dispatcher; private: - int eof_cnt; + v8::Persistent m_cb; }; class Partitioner : public RdKafka::PartitionerCb { diff --git a/src/common.cc b/src/common.cc index e8737743..0e735ec4 100644 --- a/src/common.cc +++ b/src/common.cc @@ -122,4 +122,154 @@ std::vector v8ArrayToStringVector(v8::Local parameter) { return newItem; } +namespace TopicPartition { + +/** + * @brief RdKafka::TopicPartition vector to a v8 Array + * + * @see v8ArrayToTopicPartitionVector + */ +v8::Local ToV8Array( + std::vector topic_partition_list) { + v8::Local array = Nan::New(); + for (size_t topic_partition_i = 0; + topic_partition_i < topic_partition_list.size(); topic_partition_i++) { + RdKafka::TopicPartition* topic_partition = + topic_partition_list[topic_partition_i]; + + // We have the list now let's get the properties from it + v8::Local obj = Nan::New(); + + Nan::Set(obj, Nan::New("offset").ToLocalChecked(), + Nan::New(topic_partition->offset())); + Nan::Set(obj, Nan::New("partition").ToLocalChecked(), + Nan::New(topic_partition->partition())); + Nan::Set(obj, Nan::New("topic").ToLocalChecked(), + Nan::New(topic_partition->topic().c_str()).ToLocalChecked()); + + array->Set(topic_partition_i, obj); + } + + return array; +} + +} // namespace TopicPartition + +namespace Metadata { + +/** + * @brief RdKafka::Metadata to v8::Object + * + */ +v8::Local ToV8Object(RdKafka::Metadata* metadata) { + v8::Local obj = Nan::New(); + + v8::Local broker_data = Nan::New(); + v8::Local topic_data = Nan::New(); + + const BrokerMetadataList* brokers = metadata->brokers(); // NOLINT + + unsigned int broker_i = 0; + + for (BrokerMetadataList::const_iterator it = brokers->begin(); + it != brokers->end(); ++it, broker_i++) { + // Start iterating over brokers and set the object up + + const RdKafka::BrokerMetadata* x = *it; + + v8::Local current_broker = Nan::New(); + + Nan::Set(current_broker, Nan::New("id").ToLocalChecked(), + Nan::New(x->id())); + Nan::Set(current_broker, Nan::New("host").ToLocalChecked(), + Nan::New(x->host().c_str()).ToLocalChecked()); + Nan::Set(current_broker, Nan::New("port").ToLocalChecked(), + Nan::New(x->port())); + + broker_data->Set(broker_i, current_broker); + } + + unsigned int topic_i = 0; + + const TopicMetadataList* topics = metadata->topics(); + + for (TopicMetadataList::const_iterator it = topics->begin(); + it != topics->end(); ++it, topic_i++) { + // Start iterating over topics + + const RdKafka::TopicMetadata* x = *it; + + v8::Local current_topic = Nan::New(); + + Nan::Set(current_topic, Nan::New("name").ToLocalChecked(), + Nan::New(x->topic().c_str()).ToLocalChecked()); + + v8::Local current_topic_partitions = Nan::New(); + + const PartitionMetadataList* current_partition_data = x->partitions(); + + unsigned int partition_i = 0; + PartitionMetadataList::const_iterator itt; + + for (itt = current_partition_data->begin(); + itt != current_partition_data->end(); ++itt, partition_i++) { + // partition iterate + const RdKafka::PartitionMetadata* xx = *itt; + + v8::Local current_partition = Nan::New(); + + Nan::Set(current_partition, Nan::New("id").ToLocalChecked(), + Nan::New(xx->id())); + Nan::Set(current_partition, Nan::New("leader").ToLocalChecked(), + Nan::New(xx->leader())); + + const std::vector * replicas = xx->replicas(); + const std::vector * isrs = xx->isrs(); + + std::vector::const_iterator r_it; + std::vector::const_iterator i_it; + + unsigned int r_i = 0; + unsigned int i_i = 0; + + v8::Local current_replicas = Nan::New(); + + for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) { + current_replicas->Set(r_i, Nan::New(*r_it)); + } + + v8::Local current_isrs = Nan::New(); + + for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) { + current_isrs->Set(r_i, Nan::New(*i_it)); + } + + Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(), + current_replicas); + Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(), + current_isrs); + + current_topic_partitions->Set(partition_i, current_partition); + } // iterate over partitions + + Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(), + current_topic_partitions); + + topic_data->Set(topic_i, current_topic); + } // End iterating over topics + + Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(), + Nan::New(metadata->orig_broker_id())); + + Nan::Set(obj, Nan::New("orig_broker_name").ToLocalChecked(), + Nan::New(metadata->orig_broker_name()).ToLocalChecked()); + + Nan::Set(obj, Nan::New("topics").ToLocalChecked(), topic_data); + Nan::Set(obj, Nan::New("brokers").ToLocalChecked(), broker_data); + + return obj; +} + +} // namespace Metadata + } // namespace NodeKafka diff --git a/src/common.h b/src/common.h index 6f776d69..4a47c6d2 100644 --- a/src/common.h +++ b/src/common.h @@ -18,6 +18,10 @@ #include "deps/librdkafka/src-cpp/rdkafkacpp.h" +typedef std::vector BrokerMetadataList; +typedef std::vector PartitionMetadataList; +typedef std::vector TopicMetadataList; + namespace NodeKafka { void Log(std::string); @@ -45,6 +49,18 @@ class scoped_mutex_lock { uv_mutex_t &async_lock; }; +namespace TopicPartition { + +v8::Local ToV8Array(std::vector); + +} + +namespace Metadata { + +v8::Local ToV8Object(RdKafka::Metadata*); + +} // namespace Metadata + } // namespace NodeKafka #endif // SRC_COMMON_H_ diff --git a/src/config.cc b/src/config.cc index b9f41b6f..93aef01d 100644 --- a/src/config.cc +++ b/src/config.cc @@ -13,7 +13,6 @@ #include "src/config.h" -using RdKafka::Conf; using Nan::MaybeLocal; using Nan::Maybe; using v8::Local; @@ -24,9 +23,7 @@ using std::endl; namespace NodeKafka { -namespace Config { - -void DumpConfig(std::list *dump) { +void Conf::DumpConfig(std::list *dump) { for (std::list::iterator it = dump->begin(); it != dump->end(); ) { std::cout << *it << " = "; @@ -37,22 +34,8 @@ void DumpConfig(std::list *dump) { std::cout << std::endl; } -template -void LoadParameter(v8::Local object, std::string field, const T &to) { // NOLINT - to = GetParameter(object, field, to); -} - -std::string GetValue(RdKafka::Conf* rdconf, const std::string name) { - std::string value; - if (rdconf->get(name, value) == RdKafka::Conf::CONF_UNKNOWN) { - return std::string(); - } - - return value; -} - -RdKafka::Conf* Create(RdKafka::Conf::ConfType type, v8::Local object, std::string &errstr) { // NOLINT - RdKafka::Conf* rdconf = RdKafka::Conf::create(type); +Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local object, std::string &errstr) { // NOLINT + Conf* rdconf = static_cast(RdKafka::Conf::create(type)); v8::Local property_names = object->GetOwnPropertyNames(); @@ -78,12 +61,34 @@ RdKafka::Conf* Create(RdKafka::Conf::ConfType type, v8::Local object delete rdconf; return NULL; } + } else { + if (string_key.compare("rebalance_cb") == 0) { + v8::Local cb = value.As(); + rdconf->m_rebalance_cb = new NodeKafka::Callbacks::Rebalance(cb); + rdconf->set(string_key, rdconf->m_rebalance_cb, errstr); + } } } return rdconf; } -} // namespace Config +void Conf::listen() { + if (m_rebalance_cb) { + m_rebalance_cb->dispatcher.Activate(); + } +} + +void Conf::stop() { + if (m_rebalance_cb) { + m_rebalance_cb->dispatcher.Deactivate(); + } +} + +Conf::~Conf() { + if (m_rebalance_cb) { + delete m_rebalance_cb; + } +} } // namespace NodeKafka diff --git a/src/config.h b/src/config.h index 6e1928e2..527bc0ef 100644 --- a/src/config.h +++ b/src/config.h @@ -14,19 +14,26 @@ #include #include #include +#include #include "deps/librdkafka/src-cpp/rdkafkacpp.h" #include "src/common.h" +#include "src/callbacks.h" namespace NodeKafka { -namespace Config { -void DumpConfig(std::list *); -template void LoadParameter(v8::Local, std::string, T &); // NOLINT -std::string GetValue(RdKafka::Conf*, const std::string); -RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local, std::string &); // NOLINT +class Conf : public RdKafka::Conf { + public: + ~Conf(); -} // namespace Config + static Conf* create(RdKafka::Conf::ConfType, v8::Local, std::string &); // NOLINT + static void DumpConfig(std::list *); + + void listen(); + void stop(); + protected: + NodeKafka::Callbacks::Rebalance * m_rebalance_cb = NULL; +}; } // namespace NodeKafka diff --git a/src/connection.cc b/src/connection.cc index 3f4aa692..5024e654 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -36,7 +36,7 @@ namespace NodeKafka { * @sa NodeKafka::Client */ -Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig): +Connection::Connection(Conf* gconfig, Conf* tconfig): m_event_cb(), m_gconfig(gconfig), m_tconfig(tconfig) { diff --git a/src/connection.h b/src/connection.h index e3ec9dff..5661ad03 100644 --- a/src/connection.h +++ b/src/connection.h @@ -63,7 +63,7 @@ class Connection : public Nan::ObjectWrap { virtual void DeactivateDispatchers() = 0; protected: - Connection(RdKafka::Conf*, RdKafka::Conf*); + Connection(Conf*, Conf*); ~Connection(); static Nan::Persistent constructor; @@ -72,8 +72,8 @@ class Connection : public Nan::ObjectWrap { bool m_has_been_disconnected; bool m_is_closing; - RdKafka::Conf* m_gconfig; - RdKafka::Conf* m_tconfig; + Conf* m_gconfig; + Conf* m_tconfig; std::string m_errstr; uv_mutex_t m_connection_lock; diff --git a/src/consumer.cc b/src/consumer.cc index 739de745..15e2f10a 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -37,15 +37,10 @@ consumer_commit_t::consumer_commit_t() { * @sa NodeKafka::Client */ -Consumer::Consumer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig): - Connection(gconfig, tconfig), - m_consume_cb(), - m_rebalance_cb(this) { - m_is_subscribed = false; - +Consumer::Consumer(Conf* gconfig, Conf* tconfig): + Connection(gconfig, tconfig) { std::string errstr; - m_gconfig->set("rebalance_cb", &m_rebalance_cb, errstr); m_gconfig->set("default_topic_conf", m_tconfig, errstr); } @@ -75,9 +70,14 @@ Baton Consumer::Connect() { } void Consumer::ActivateDispatchers() { + // Listen to global config + m_gconfig->listen(); + + // Listen to non global config + // tconfig->listen(); + + // This should be refactored to config based management m_event_cb.dispatcher.Activate(); - m_consume_cb.dispatcher.Activate(); - m_rebalance_cb.dispatcher.Activate(); } Baton Consumer::Disconnect() { @@ -104,9 +104,11 @@ Baton Consumer::Disconnect() { } void Consumer::DeactivateDispatchers() { + // Stop listening to the config dispatchers + m_gconfig->stop(); + + // Also this one m_event_cb.dispatcher.Deactivate(); - m_consume_cb.dispatcher.Deactivate(); - m_rebalance_cb.dispatcher.Deactivate(); } bool Consumer::IsSubscribed() { @@ -334,9 +336,6 @@ void Consumer::Init(v8::Local exports) { * @sa RdKafka::KafkaConsumer */ - Nan::SetPrototypeMethod(tpl, "onConsume", NodeOnConsume); - Nan::SetPrototypeMethod(tpl, "onRebalance", NodeOnRebalance); - /* * @brief Methods exposed to do with message retrieval */ @@ -381,15 +380,15 @@ void Consumer::New(const Nan::FunctionCallbackInfo& info) { std::string errstr; - RdKafka::Conf* gconfig = - Config::Create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr); + Conf* gconfig = + Conf::create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr); if (!gconfig) { return Nan::ThrowError(errstr.c_str()); } - RdKafka::Conf* tconfig = - Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); + Conf* tconfig = + Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); if (!tconfig) { delete gconfig; @@ -423,35 +422,6 @@ v8::Local Consumer::NewInstance(v8::Local arg) { /* Node exposed methods */ -NAN_METHOD(Consumer::NodeOnConsume) { - if (info.Length() < 1 || !info[0]->IsFunction()) { - // Just throw an exception - return Nan::ThrowError("Need to specify a callback"); - } - - Consumer* obj = ObjectWrap::Unwrap(info.This()); - - v8::Local cb = info[0].As(); - obj->m_consume_cb.dispatcher.AddCallback(cb); - - info.GetReturnValue().Set(Nan::True()); -} - -NAN_METHOD(Consumer::NodeOnRebalance) { - if (info.Length() < 1 || !info[0]->IsFunction()) { - // Just throw an exception - return Nan::ThrowError("Need to specify a callback"); - } - - Consumer* consumer = ObjectWrap::Unwrap(info.This()); - - v8::Local cb = info[0].As(); - - consumer->m_rebalance_cb.dispatcher.AddCallback(cb); - - info.GetReturnValue().Set(Nan::True()); -} - NAN_METHOD(Consumer::NodeGetAssignments) { Nan::HandleScope scope; diff --git a/src/consumer.h b/src/consumer.h index 673bcbd4..4612bbcb 100644 --- a/src/consumer.h +++ b/src/consumer.h @@ -47,8 +47,6 @@ struct consumer_commit_t { class Consumer : public Connection { public: - friend class NodeKafka::Callbacks::Rebalance; - static void Init(v8::Local); static v8::Local NewInstance(v8::Local); @@ -81,7 +79,7 @@ class Consumer : public Connection { static Nan::Persistent constructor; static void New(const Nan::FunctionCallbackInfo& info); - Consumer(RdKafka::Conf *, RdKafka::Conf *); + Consumer(Conf *, Conf *); ~Consumer(); private: @@ -89,14 +87,9 @@ class Consumer : public Connection { std::vector m_partitions; int m_partition_cnt; - bool m_is_subscribed; - - Callbacks::Consume m_consume_cb; - Callbacks::Rebalance m_rebalance_cb; + bool m_is_subscribed = false; // Node methods - static NAN_METHOD(NodeOnConsume); - static NAN_METHOD(NodeOnRebalance); static NAN_METHOD(NodeConnect); static NAN_METHOD(NodeSubscribe); static NAN_METHOD(NodeSubscribeSync); diff --git a/src/message.cc b/src/message.cc index 00829138..33fbe3f7 100644 --- a/src/message.cc +++ b/src/message.cc @@ -101,7 +101,6 @@ size_t Message::Size() { void Message::Free(char * data, void * hint) { Message* m = static_cast(hint); - // @note Am I responsible for freeing data as well? delete m; } diff --git a/src/producer.cc b/src/producer.cc index ce780286..1c2a53d4 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -28,7 +28,7 @@ namespace NodeKafka { * @sa NodeKafka::Connection */ -Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig): +Producer::Producer(Conf* gconfig, Conf* tconfig): Connection(gconfig, tconfig), m_dr_cb(), m_partitioner_cb() { @@ -109,15 +109,15 @@ void Producer::New(const Nan::FunctionCallbackInfo& info) { std::string errstr; - RdKafka::Conf* gconfig = - Config::Create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr); + Conf* gconfig = + Conf::create(RdKafka::Conf::CONF_GLOBAL, info[0]->ToObject(), errstr); if (!gconfig) { return Nan::ThrowError(errstr.c_str()); } - RdKafka::Conf* tconfig = - Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); + Conf* tconfig = + Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); if (!tconfig) { // No longer need this since we aren't instantiating anything diff --git a/src/producer.h b/src/producer.h index 5a4d02c5..648b162e 100644 --- a/src/producer.h +++ b/src/producer.h @@ -66,7 +66,7 @@ class Producer : public Connection { static Nan::Persistent constructor; static void New(const Nan::FunctionCallbackInfo&); - Producer(RdKafka::Conf*, RdKafka::Conf*); + Producer(Conf*, Conf*); ~Producer(); private: diff --git a/src/topic.cc b/src/topic.cc index e60b47aa..a8af5267 100644 --- a/src/topic.cc +++ b/src/topic.cc @@ -85,7 +85,7 @@ void Topic::New(const Nan::FunctionCallbackInfo& info) { std::string errstr; RdKafka::Conf* config = - Config::Create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); + Conf::create(RdKafka::Conf::CONF_TOPIC, info[1]->ToObject(), errstr); if (!config) { return Nan::ThrowError(errstr.c_str()); diff --git a/src/workers.cc b/src/workers.cc index d46bdec4..9eae5661 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -12,10 +12,6 @@ #include "src/workers.h" -typedef std::vector BrokerMetadataList; -typedef std::vector PartitionMetadataList; -typedef std::vector TopicMetadataList; - using NodeKafka::Producer; using NodeKafka::Connection; using NodeKafka::Message; @@ -45,7 +41,7 @@ void ConnectionMetadata::Execute() { // No good way to do this except some stupid string delimiting. // maybe we'll delimit it by a | or something and just split // the string to create the object - metadata_ = b.data(); + m_metadata = b.data(); } else { SetErrorCode(b.err()); } @@ -57,117 +53,12 @@ void ConnectionMetadata::HandleOKCallback() { const unsigned int argc = 2; // This is a big one! - - v8::Local obj = Nan::New(); - - v8::Local broker_data = Nan::New(); - v8::Local topic_data = Nan::New(); - - const BrokerMetadataList* brokers = metadata_->brokers(); // NOLINT - - unsigned int broker_i = 0; - - for (BrokerMetadataList::const_iterator it = brokers->begin(); - it != brokers->end(); ++it, broker_i++) { - // Start iterating over brokers and set the object up - - const RdKafka::BrokerMetadata* x = *it; - - v8::Local current_broker = Nan::New(); - - Nan::Set(current_broker, Nan::New("id").ToLocalChecked(), - Nan::New(x->id())); - Nan::Set(current_broker, Nan::New("host").ToLocalChecked(), - Nan::New(x->host().c_str()).ToLocalChecked()); - Nan::Set(current_broker, Nan::New("port").ToLocalChecked(), - Nan::New(x->port())); - - broker_data->Set(broker_i, current_broker); - } - - unsigned int topic_i = 0; - - const TopicMetadataList* topics = metadata_->topics(); - - for (TopicMetadataList::const_iterator it = topics->begin(); - it != topics->end(); ++it, topic_i++) { - // Start iterating over topics - - const RdKafka::TopicMetadata* x = *it; - - v8::Local current_topic = Nan::New(); - - Nan::Set(current_topic, Nan::New("name").ToLocalChecked(), - Nan::New(x->topic().c_str()).ToLocalChecked()); - - v8::Local current_topic_partitions = Nan::New(); - - const PartitionMetadataList* current_partition_data = x->partitions(); - - unsigned int partition_i = 0; - PartitionMetadataList::const_iterator itt; - - for (itt = current_partition_data->begin(); - itt != current_partition_data->end(); ++itt, partition_i++) { - // partition iterate - const RdKafka::PartitionMetadata* xx = *itt; - - v8::Local current_partition = Nan::New(); - - Nan::Set(current_partition, Nan::New("id").ToLocalChecked(), - Nan::New(xx->id())); - Nan::Set(current_partition, Nan::New("leader").ToLocalChecked(), - Nan::New(xx->leader())); - - const std::vector * replicas = xx->replicas(); - const std::vector * isrs = xx->isrs(); - - std::vector::const_iterator r_it; - std::vector::const_iterator i_it; - - unsigned int r_i = 0; - unsigned int i_i = 0; - - v8::Local current_replicas = Nan::New(); - - for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) { - current_replicas->Set(r_i, Nan::New(*r_it)); - } - - v8::Local current_isrs = Nan::New(); - - for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) { - current_isrs->Set(r_i, Nan::New(*i_it)); - } - - Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(), - current_replicas); - Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(), - current_isrs); - - current_topic_partitions->Set(partition_i, current_partition); - } // iterate over partitions - - Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(), - current_topic_partitions); - - topic_data->Set(topic_i, current_topic); - } // End iterating over topics - - Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(), - Nan::New(metadata_->orig_broker_id())); - - Nan::Set(obj, Nan::New("orig_broker_name").ToLocalChecked(), - Nan::New(metadata_->orig_broker_name()).ToLocalChecked()); - - Nan::Set(obj, Nan::New("topics").ToLocalChecked(), topic_data); - Nan::Set(obj, Nan::New("brokers").ToLocalChecked(), broker_data); - - v8::Local argv[argc] = { Nan::Null(), obj}; + v8::Local argv[argc] = { Nan::Null(), + NodeKafka::Metadata::ToV8Object(m_metadata)}; callback->Call(argc, argv); - delete metadata_; + delete m_metadata; } void ConnectionMetadata::HandleErrorCallback() { diff --git a/src/workers.h b/src/workers.h index 7c05c25c..031696f1 100644 --- a/src/workers.h +++ b/src/workers.h @@ -158,7 +158,7 @@ class ConnectionMetadata : public ErrorAwareWorker { std::string topic_; int timeout_ms_; - RdKafka::Metadata* metadata_; + RdKafka::Metadata* m_metadata; // Now this is the data that will get translated in the OK callback };