Skip to content

Commit

Permalink
Do native isConnected checking inside a lock (#7)
Browse files Browse the repository at this point in the history
Fixes test case (as far as I can tell) for Issue #7
  • Loading branch information
webmakersteve committed Aug 16, 2016
1 parent 7266497 commit 49cf69f
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 14 deletions.
11 changes: 8 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,15 @@ function maybeTopic(name, config) {
* @return {Topic} - a new Kafka Topic.
*/
Producer.prototype.Topic = function(name, config) {
if (!this._isConnected) {
throw new Error('Producer not connected');
try {
if (!this._isConnected) {
throw new Error('Producer not connected');
}
return new Kafka.Topic(name, config, this._client);
} catch (err) {
err.message = 'Error creating topic "' + name + '"": ' + err.message;
this.emit('error', LibrdKafkaError.create(err));
}
return new Kafka.Topic(name, config, this._client);
};

/**
Expand Down
24 changes: 20 additions & 4 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,27 @@ RdKafka::Handle* Connection::GetClient() {
}

Baton Connection::CreateTopic(std::string topic_name) {
RdKafka::Topic* topic =
RdKafka::Topic::create(m_client, topic_name, NULL, m_errstr);
return CreateTopic(topic_name, NULL);
}

Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
std::string errstr;

RdKafka::Topic* topic = NULL;

if (IsConnected()) {
scoped_mutex_lock lock(m_connection_lock);
if (IsConnected()) {
topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr);
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
}
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
}

if (!m_errstr.empty()) {
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION);
if (!errstr.empty()) {
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
}

// Maybe do it this way later? Then we don't need to do static_cast
Expand Down
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Connection : public Nan::ObjectWrap {

// Baton<RdKafka::Topic*>
Baton CreateTopic(std::string);
Baton CreateTopic(std::string, RdKafka::Conf*);
Baton GetMetadata(std::string, int);

RdKafka::Handle* GetClient();
Expand Down
19 changes: 16 additions & 3 deletions src/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,44 @@

namespace NodeKafka {

v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err) {
v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err, std::string errstr) {
//
int code = static_cast<int>(err);

v8::Local<v8::Object> ret = Nan::New<v8::Object>();

ret->Set(Nan::New("message").ToLocalChecked(),
Nan::New<v8::String>(RdKafka::err2str(err)).ToLocalChecked());
Nan::New<v8::String>(errstr).ToLocalChecked());
ret->Set(Nan::New("code").ToLocalChecked(),
Nan::New<v8::Number>(code));

return ret;
}

v8::Local<v8::Object> RdKafkaError(const RdKafka::ErrorCode &err) {
return RdKafkaError(err, RdKafka::err2str(err));
}

Baton::Baton(const RdKafka::ErrorCode &code) {
m_err = code;
}

Baton::Baton(const RdKafka::ErrorCode &code, std::string errstr) {
m_err = code;
m_errstr = errstr;
}

Baton::Baton(void* _data) {
m_err = RdKafka::ERR_NO_ERROR;
m_data = _data;
}

v8::Local<v8::Object> Baton::ToObject() {
return RdKafkaError(m_err);
if (m_errstr.empty()) {
return RdKafkaError(m_err);
} else {
return RdKafkaError(m_err, m_errstr);
}
}

RdKafka::ErrorCode Baton::err() {
Expand Down
2 changes: 2 additions & 0 deletions src/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Baton {
public:
explicit Baton(const RdKafka::ErrorCode &);
explicit Baton(void* data);
explicit Baton(const RdKafka::ErrorCode &, std::string);

template<typename T> T data() {
return static_cast<T>(m_data);
Expand All @@ -35,6 +36,7 @@ class Baton {

private:
void* m_data;
std::string m_errstr;
RdKafka::ErrorCode m_err;
};

Expand Down
1 change: 0 additions & 1 deletion src/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ Message::Message(RdKafka::Message *message):
}

Message::~Message() {
// @todo I think I'd rather check if _message is NULL
if (m_message) {
delete m_message;
}
Expand Down
19 changes: 16 additions & 3 deletions src/topic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@ namespace NodeKafka {
*/

Topic::Topic(std::string topic_name, RdKafka::Conf* config, Connection * handle) { // NOLINT
m_topic =
RdKafka::Topic::create(handle->GetClient(), topic_name, config, errstr);
Baton b = handle->CreateTopic(topic_name, config);

if (b.err() != RdKafka::ERR_NO_ERROR) {
m_topic = NULL;
} else {
m_topic = b.data<RdKafka::Topic*>();
}

}

Topic::~Topic() {
delete m_topic;
if (m_topic) {
delete m_topic;
}
}

Nan::Persistent<v8::Function> Topic::constructor;
Expand Down Expand Up @@ -90,7 +98,12 @@ void Topic::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {

Connection* connection = ObjectWrap::Unwrap<Connection>(info[2]->ToObject());

if (!connection->IsConnected()) {
return Nan::ThrowError("Client is not connected");
}

Topic* topic = new Topic(topic_name, config, connection);

// Wrap it
topic->Wrap(info.This());

Expand Down

0 comments on commit 49cf69f

Please sign in to comment.