Skip to content

Commit

Permalink
Ensure bindings gracefully shutdown dispatcher threads (#6)
Browse files Browse the repository at this point in the history
* Do not initialize dispatching threads at construct time

Dispatching threads were stopping graceful shutdown. Instead of
initializing them when the object is constructed, they now
only listen when the object has connected. In order to ensure
they activate on the main thread, they are started and halted
in the Async worker complete callbacks.

The Activate and Deactivate methods can be called many times, as
they rely on checking the async member to test whether or not
they need to be activated or deactivated.

* Cleanup directory and fix deprecations
  • Loading branch information
webmakersteve authored Aug 16, 2016
1 parent 8a321b6 commit 7266497
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 72 deletions.
12 changes: 0 additions & 12 deletions index.js

This file was deleted.

2 changes: 1 addition & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports = Client;

var Emitter = require('events').EventEmitter;
var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var assert = require('assert');

var LibrdKafkaError = require('./error');
Expand Down
2 changes: 1 addition & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var error = require('./error');
var util = require('util');

module.exports = {
Consumer: util.deprecate('Use KafkaConsumer instead. This may be changed in a later version', KafkaConsumer),
Consumer: util.deprecate(KafkaConsumer, 'Use KafkaConsumer instead. This may be changed in a later version'),
Producer: Producer,
KafkaConsumer: KafkaConsumer,
CODES: {
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = KafkaConsumer;

var Client = require('./client');
var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var TopicReadable = require('./util/topicReadable');
var LibrdKafkaError = require('./error');

Expand Down
6 changes: 3 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module.exports = Producer;
var Client = require('./client');

var util = require('util');
var Kafka = require('../kafka-native.js');
var Kafka = require('../librdkafka.js');
var TopicWritable = require('./util/topicWritable');
var LibrdKafkaError = require('./error');

Expand Down Expand Up @@ -132,7 +132,7 @@ function maybeTopic(name, config) {
// this may be what we want
return name;
}
};
}

/**
* Create a topic by topic name and config
Expand Down Expand Up @@ -217,7 +217,7 @@ Producer.prototype.produceSync = function(msg) {
* @see Producer#produceSync
* @deprecated
*/
Producer.prototype.sendMessageSync = util.deprecate(this.produceSync,
Producer.prototype.sendMessageSync = util.deprecate(Producer.prototype.produceSync,
'this.sendMessageSync: use this.produceSync instead');

/**
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
"license": "MIT",
"devDependencies": {
"jshint": "2.x",
"jsdoc": "^3.4.0",
"toolkit-jsdoc": "^1.0.0",
"mocha": "2.x",
"node-gyp": "3.x"
},
"dependencies": {
"bindings": "1.x",
"jsdoc": "^3.4.0",
"nan": "2.x",
"toolkit-jsdoc": "^1.0.0"
"nan": "2.x"
},
"engines": {
"npm": "^2.7.3"
Expand Down
30 changes: 22 additions & 8 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ namespace NodeKafka {
namespace Callbacks {

Dispatcher::Dispatcher() {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);

async->data = this;

async = NULL;
uv_mutex_init(&async_lock);
}

Expand All @@ -39,17 +35,35 @@ Dispatcher::~Dispatcher() {
callbacks[i].Reset();
}

// async->data = this;

uv_mutex_destroy(&async_lock);
}

// Only run this if we aren't already listening
void Dispatcher::Activate() {
if (!async) {
async = new uv_async_t;
uv_async_init(uv_default_loop(), async, AsyncMessage_);

async->data = this;
}
}

// Should be able to run this regardless of whether it is active or not
void Dispatcher::Deactivate() {
if (async) {
uv_close(reinterpret_cast<uv_handle_t*>(async), NULL);
async = NULL;
}
}

bool Dispatcher::HasCallbacks() {
return callbacks.size() > 0;
}

void Dispatcher::Execute() {
uv_async_send(async);
if (async) {
uv_async_send(async);
}
}

void Dispatcher::Dispatch(const int _argc, Local<Value> _argv[]) {
Expand Down
2 changes: 2 additions & 0 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Dispatcher {
bool HasCallbacks();
virtual void Flush() = 0;
void Execute();
void Activate();
void Deactivate();
protected:
std::vector<v8::Persistent<v8::Function, v8::CopyablePersistentTraits<v8::Function> > > callbacks; // NOLINT

Expand Down
3 changes: 3 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class Connection : public Nan::ObjectWrap {

Callbacks::Event m_event_cb;

virtual void ActivateDispatchers() = 0;
virtual void DeactivateDispatchers() = 0;

protected:
Connection(RdKafka::Conf*, RdKafka::Conf*);
~Connection();
Expand Down
87 changes: 50 additions & 37 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,56 @@ Baton Consumer::Connect() {
return Baton(RdKafka::ERR_NO_ERROR);
}

void Consumer::ActivateDispatchers() {
m_event_cb.dispatcher.Activate();
m_consume_cb.dispatcher.Activate();
m_rebalance_cb.dispatcher.Activate();
}

Baton Consumer::Disconnect() {
// Only close client if it is connected
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;

if (IsConnected()) {
m_is_closing = true;
{
scoped_mutex_lock lock(m_connection_lock);

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
err = consumer->close();

delete m_client;
m_client = NULL;

RdKafka::wait_destroyed(1000);
}
}

m_is_closing = false;

return Baton(err);
}

void Consumer::DeactivateDispatchers() {
m_event_cb.dispatcher.Deactivate();
m_consume_cb.dispatcher.Deactivate();
m_rebalance_cb.dispatcher.Deactivate();
}

bool Consumer::IsSubscribed() {
if (!IsConnected()) {
return false;
}

if (!m_is_subscribed) {
return false;
}

return true;
}


bool Consumer::HasAssignedPartitions() {
return !m_partitions.empty();
}
Expand Down Expand Up @@ -349,43 +399,6 @@ v8::Local<v8::Object> Consumer::NewInstance(v8::Local<v8::Value> arg) {
return scope.Escape(instance);
}

Baton Consumer::Disconnect() {
// Only close client if it is connected
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;

if (IsConnected()) {
m_is_closing = true;
{
scoped_mutex_lock lock(m_connection_lock);

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
err = consumer->close();

delete m_client;
m_client = NULL;

RdKafka::wait_destroyed(1000);
}
}

m_is_closing = false;

return Baton(err);
}

bool Consumer::IsSubscribed() {
if (!IsConnected()) {
return false;
}

if (!m_is_subscribed) {
return false;
}

return true;
}

/* Node exposed methods */

NAN_METHOD(Consumer::NodeOnConsume) {
Expand Down
3 changes: 3 additions & 0 deletions src/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class Consumer : public Connection {
Baton Subscribe(std::vector<std::string>);
NodeKafka::Message* Consume();

void ActivateDispatchers();
void DeactivateDispatchers();

protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
Expand Down
10 changes: 10 additions & 0 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ Baton Producer::Connect() {
return Baton(RdKafka::ERR_NO_ERROR);
}

void Producer::ActivateDispatchers() {
m_event_cb.dispatcher.Activate(); // From connection
m_dr_cb.dispatcher.Activate();
}

void Producer::DeactivateDispatchers() {
m_event_cb.dispatcher.Deactivate(); // From connection
m_dr_cb.dispatcher.Deactivate();
}

void Producer::Disconnect() {
if (IsConnected()) {
scoped_mutex_lock lock(m_connection_lock);
Expand Down
3 changes: 3 additions & 0 deletions src/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class Producer : public Connection {
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
std::string Name();

void ActivateDispatchers();
void DeactivateDispatchers();

protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>&);
Expand Down
19 changes: 13 additions & 6 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ void ProducerConnect::HandleOKCallback() {

v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj};

// Activate the dispatchers
producer->ActivateDispatchers();

callback->Call(argc, argv);
}

Expand Down Expand Up @@ -242,16 +245,15 @@ void ProducerDisconnect::HandleOKCallback() {
const unsigned int argc = 2;
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True()};

// Deactivate the dispatchers
producer->DeactivateDispatchers();

callback->Call(argc, argv);
}

void ProducerDisconnect::HandleErrorCallback() {
Nan::HandleScope scope;

const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc] = { Nan::Error(ErrorMessage()) };

callback->Call(argc, argv);
// This should never run
assert(0);
}

ProducerProduce::ProducerProduce(
Expand Down Expand Up @@ -327,6 +329,7 @@ void ConsumerConnect::HandleOKCallback() {
Nan::New(consumer->Name()).ToLocalChecked());

v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj };
consumer->ActivateDispatchers();

callback->Call(argc, argv);
}
Expand Down Expand Up @@ -370,6 +373,8 @@ void ConsumerDisconnect::HandleOKCallback() {
const unsigned int argc = 2;
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True() };

consumer->DeactivateDispatchers();

callback->Call(argc, argv);
}

Expand All @@ -379,6 +384,8 @@ void ConsumerDisconnect::HandleErrorCallback() {
const unsigned int argc = 1;
v8::Local<v8::Value> argv[argc] = { GetErrorObject() };

consumer->DeactivateDispatchers();

callback->Call(argc, argv);
}

Expand Down

0 comments on commit 7266497

Please sign in to comment.