Skip to content

Commit 7266497

Browse files
Ensure bindings gracefully shutdown dispatcher threads (Blizzard#6)
* Do not initialize dispatching threads at construct time Dispatching threads were stopping graceful shutdown. Instead of initializing them when the object is constructed, they now only listen when the object has connected. In order to ensure they activate on the main thread, they are started and halted in the Async worker complete callbacks. The Activate and Deactivate methods can be called many times, as they rely on checking the async member to test whether or not they need to be activated or deactivated. * Cleanup directory and fix deprecations
1 parent 8a321b6 commit 7266497

15 files changed

+115
-72
lines changed

Diff for: index.js

-12
This file was deleted.

Diff for: lib/client.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module.exports = Client;
1111

1212
var Emitter = require('events').EventEmitter;
1313
var util = require('util');
14-
var Kafka = require('../kafka-native.js');
14+
var Kafka = require('../librdkafka.js');
1515
var assert = require('assert');
1616

1717
var LibrdKafkaError = require('./error');

Diff for: lib/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var error = require('./error');
1313
var util = require('util');
1414

1515
module.exports = {
16-
Consumer: util.deprecate('Use KafkaConsumer instead. This may be changed in a later version', KafkaConsumer),
16+
Consumer: util.deprecate(KafkaConsumer, 'Use KafkaConsumer instead. This may be changed in a later version'),
1717
Producer: Producer,
1818
KafkaConsumer: KafkaConsumer,
1919
CODES: {

Diff for: lib/kafka-consumer.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module.exports = KafkaConsumer;
1212

1313
var Client = require('./client');
1414
var util = require('util');
15-
var Kafka = require('../kafka-native.js');
15+
var Kafka = require('../librdkafka.js');
1616
var TopicReadable = require('./util/topicReadable');
1717
var LibrdKafkaError = require('./error');
1818

Diff for: lib/producer.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module.exports = Producer;
1212
var Client = require('./client');
1313

1414
var util = require('util');
15-
var Kafka = require('../kafka-native.js');
15+
var Kafka = require('../librdkafka.js');
1616
var TopicWritable = require('./util/topicWritable');
1717
var LibrdKafkaError = require('./error');
1818

@@ -132,7 +132,7 @@ function maybeTopic(name, config) {
132132
// this may be what we want
133133
return name;
134134
}
135-
};
135+
}
136136

137137
/**
138138
* Create a topic by topic name and config
@@ -217,7 +217,7 @@ Producer.prototype.produceSync = function(msg) {
217217
* @see Producer#produceSync
218218
* @deprecated
219219
*/
220-
Producer.prototype.sendMessageSync = util.deprecate(this.produceSync,
220+
Producer.prototype.sendMessageSync = util.deprecate(Producer.prototype.produceSync,
221221
'this.sendMessageSync: use this.produceSync instead');
222222

223223
/**

Diff for: kafka-native.js renamed to librdkafka.js

File renamed without changes.

Diff for: package.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@
2929
"license": "MIT",
3030
"devDependencies": {
3131
"jshint": "2.x",
32+
"jsdoc": "^3.4.0",
33+
"toolkit-jsdoc": "^1.0.0",
3234
"mocha": "2.x",
3335
"node-gyp": "3.x"
3436
},
3537
"dependencies": {
3638
"bindings": "1.x",
37-
"jsdoc": "^3.4.0",
38-
"nan": "2.x",
39-
"toolkit-jsdoc": "^1.0.0"
39+
"nan": "2.x"
4040
},
4141
"engines": {
4242
"npm": "^2.7.3"

Diff for: src/callbacks.cc

+22-8
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@ namespace NodeKafka {
2424
namespace Callbacks {
2525

2626
Dispatcher::Dispatcher() {
27-
async = new uv_async_t;
28-
uv_async_init(uv_default_loop(), async, AsyncMessage_);
29-
30-
async->data = this;
31-
27+
async = NULL;
3228
uv_mutex_init(&async_lock);
3329
}
3430

@@ -39,17 +35,35 @@ Dispatcher::~Dispatcher() {
3935
callbacks[i].Reset();
4036
}
4137

42-
// async->data = this;
43-
4438
uv_mutex_destroy(&async_lock);
4539
}
4640

41+
// Only run this if we aren't already listening
42+
void Dispatcher::Activate() {
43+
if (!async) {
44+
async = new uv_async_t;
45+
uv_async_init(uv_default_loop(), async, AsyncMessage_);
46+
47+
async->data = this;
48+
}
49+
}
50+
51+
// Should be able to run this regardless of whether it is active or not
52+
void Dispatcher::Deactivate() {
53+
if (async) {
54+
uv_close(reinterpret_cast<uv_handle_t*>(async), NULL);
55+
async = NULL;
56+
}
57+
}
58+
4759
bool Dispatcher::HasCallbacks() {
4860
return callbacks.size() > 0;
4961
}
5062

5163
void Dispatcher::Execute() {
52-
uv_async_send(async);
64+
if (async) {
65+
uv_async_send(async);
66+
}
5367
}
5468

5569
void Dispatcher::Dispatch(const int _argc, Local<Value> _argv[]) {

Diff for: src/callbacks.h

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class Dispatcher {
3737
bool HasCallbacks();
3838
virtual void Flush() = 0;
3939
void Execute();
40+
void Activate();
41+
void Deactivate();
4042
protected:
4143
std::vector<v8::Persistent<v8::Function, v8::CopyablePersistentTraits<v8::Function> > > callbacks; // NOLINT
4244

Diff for: src/connection.h

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class Connection : public Nan::ObjectWrap {
5959

6060
Callbacks::Event m_event_cb;
6161

62+
virtual void ActivateDispatchers() = 0;
63+
virtual void DeactivateDispatchers() = 0;
64+
6265
protected:
6366
Connection(RdKafka::Conf*, RdKafka::Conf*);
6467
~Connection();

Diff for: src/consumer.cc

+50-37
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,56 @@ Baton Consumer::Connect() {
7474
return Baton(RdKafka::ERR_NO_ERROR);
7575
}
7676

77+
void Consumer::ActivateDispatchers() {
78+
m_event_cb.dispatcher.Activate();
79+
m_consume_cb.dispatcher.Activate();
80+
m_rebalance_cb.dispatcher.Activate();
81+
}
82+
83+
Baton Consumer::Disconnect() {
84+
// Only close client if it is connected
85+
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;
86+
87+
if (IsConnected()) {
88+
m_is_closing = true;
89+
{
90+
scoped_mutex_lock lock(m_connection_lock);
91+
92+
RdKafka::KafkaConsumer* consumer =
93+
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
94+
err = consumer->close();
95+
96+
delete m_client;
97+
m_client = NULL;
98+
99+
RdKafka::wait_destroyed(1000);
100+
}
101+
}
102+
103+
m_is_closing = false;
104+
105+
return Baton(err);
106+
}
107+
108+
void Consumer::DeactivateDispatchers() {
109+
m_event_cb.dispatcher.Deactivate();
110+
m_consume_cb.dispatcher.Deactivate();
111+
m_rebalance_cb.dispatcher.Deactivate();
112+
}
113+
114+
bool Consumer::IsSubscribed() {
115+
if (!IsConnected()) {
116+
return false;
117+
}
118+
119+
if (!m_is_subscribed) {
120+
return false;
121+
}
122+
123+
return true;
124+
}
125+
126+
77127
bool Consumer::HasAssignedPartitions() {
78128
return !m_partitions.empty();
79129
}
@@ -349,43 +399,6 @@ v8::Local<v8::Object> Consumer::NewInstance(v8::Local<v8::Value> arg) {
349399
return scope.Escape(instance);
350400
}
351401

352-
Baton Consumer::Disconnect() {
353-
// Only close client if it is connected
354-
RdKafka::ErrorCode err = RdKafka::ERR_NO_ERROR;
355-
356-
if (IsConnected()) {
357-
m_is_closing = true;
358-
{
359-
scoped_mutex_lock lock(m_connection_lock);
360-
361-
RdKafka::KafkaConsumer* consumer =
362-
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);
363-
err = consumer->close();
364-
365-
delete m_client;
366-
m_client = NULL;
367-
368-
RdKafka::wait_destroyed(1000);
369-
}
370-
}
371-
372-
m_is_closing = false;
373-
374-
return Baton(err);
375-
}
376-
377-
bool Consumer::IsSubscribed() {
378-
if (!IsConnected()) {
379-
return false;
380-
}
381-
382-
if (!m_is_subscribed) {
383-
return false;
384-
}
385-
386-
return true;
387-
}
388-
389402
/* Node exposed methods */
390403

391404
NAN_METHOD(Consumer::NodeOnConsume) {

Diff for: src/consumer.h

+3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class Consumer : public Connection {
7474
Baton Subscribe(std::vector<std::string>);
7575
NodeKafka::Message* Consume();
7676

77+
void ActivateDispatchers();
78+
void DeactivateDispatchers();
79+
7780
protected:
7881
static Nan::Persistent<v8::Function> constructor;
7982
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);

Diff for: src/producer.cc

+10
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,16 @@ Baton Producer::Connect() {
173173
return Baton(RdKafka::ERR_NO_ERROR);
174174
}
175175

176+
void Producer::ActivateDispatchers() {
177+
m_event_cb.dispatcher.Activate(); // From connection
178+
m_dr_cb.dispatcher.Activate();
179+
}
180+
181+
void Producer::DeactivateDispatchers() {
182+
m_event_cb.dispatcher.Deactivate(); // From connection
183+
m_dr_cb.dispatcher.Deactivate();
184+
}
185+
176186
void Producer::Disconnect() {
177187
if (IsConnected()) {
178188
scoped_mutex_lock lock(m_connection_lock);

Diff for: src/producer.h

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class Producer : public Connection {
6060
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
6161
std::string Name();
6262

63+
void ActivateDispatchers();
64+
void DeactivateDispatchers();
65+
6366
protected:
6467
static Nan::Persistent<v8::Function> constructor;
6568
static void New(const Nan::FunctionCallbackInfo<v8::Value>&);

Diff for: src/workers.cc

+13-6
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ void ProducerConnect::HandleOKCallback() {
213213

214214
v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj};
215215

216+
// Activate the dispatchers
217+
producer->ActivateDispatchers();
218+
216219
callback->Call(argc, argv);
217220
}
218221

@@ -242,16 +245,15 @@ void ProducerDisconnect::HandleOKCallback() {
242245
const unsigned int argc = 2;
243246
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True()};
244247

248+
// Deactivate the dispatchers
249+
producer->DeactivateDispatchers();
250+
245251
callback->Call(argc, argv);
246252
}
247253

248254
void ProducerDisconnect::HandleErrorCallback() {
249-
Nan::HandleScope scope;
250-
251-
const unsigned int argc = 1;
252-
v8::Local<v8::Value> argv[argc] = { Nan::Error(ErrorMessage()) };
253-
254-
callback->Call(argc, argv);
255+
// This should never run
256+
assert(0);
255257
}
256258

257259
ProducerProduce::ProducerProduce(
@@ -327,6 +329,7 @@ void ConsumerConnect::HandleOKCallback() {
327329
Nan::New(consumer->Name()).ToLocalChecked());
328330

329331
v8::Local<v8::Value> argv[argc] = { Nan::Null(), obj };
332+
consumer->ActivateDispatchers();
330333

331334
callback->Call(argc, argv);
332335
}
@@ -370,6 +373,8 @@ void ConsumerDisconnect::HandleOKCallback() {
370373
const unsigned int argc = 2;
371374
v8::Local<v8::Value> argv[argc] = { Nan::Null(), Nan::True() };
372375

376+
consumer->DeactivateDispatchers();
377+
373378
callback->Call(argc, argv);
374379
}
375380

@@ -379,6 +384,8 @@ void ConsumerDisconnect::HandleErrorCallback() {
379384
const unsigned int argc = 1;
380385
v8::Local<v8::Value> argv[argc] = { GetErrorObject() };
381386

387+
consumer->DeactivateDispatchers();
388+
382389
callback->Call(argc, argv);
383390
}
384391

0 commit comments

Comments
 (0)