Skip to content

Commit 6a20e24

Browse files
Properly point to topic configuration on initialization (#11)
1 parent 5187b20 commit 6a20e24

File tree

7 files changed

+20
-41
lines changed

7 files changed

+20
-41
lines changed

src/config.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <string>
1111
#include <vector>
12+
#include <list>
1213

1314
#include "src/config.h"
1415

@@ -25,6 +26,17 @@ namespace NodeKafka {
2526

2627
namespace Config {
2728

29+
void DumpConfig(std::list<std::string> *dump) {
30+
for (std::list<std::string>::iterator it = dump->begin();
31+
it != dump->end(); ) {
32+
std::cout << *it << " = ";
33+
it++;
34+
std::cout << *it << std::endl;
35+
it++;
36+
}
37+
std::cout << std::endl;
38+
}
39+
2840
template<typename T>
2941
void LoadParameter(v8::Local<v8::Object> object, std::string field, const T &to) { // NOLINT
3042
to = GetParameter<T>(object, field, to);

src/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
#include <nan.h>
1414
#include <iostream>
1515
#include <vector>
16+
#include <list>
1617

1718
#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
1819
#include "src/common.h"
1920

2021
namespace NodeKafka {
2122
namespace Config {
2223

24+
void DumpConfig(std::list<std::string> *);
2325
template<typename T> void LoadParameter(v8::Local<v8::Object>, std::string, T &); // NOLINT
2426
std::string GetValue(RdKafka::Conf*, const std::string);
2527
RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT

src/connection.cc

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
*/
99

1010
#include <string>
11-
#include <list>
1211

1312
#include "src/connection.h"
1413
#include "src/workers.h"
@@ -57,13 +56,14 @@ Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
5756

5857
Connection::~Connection() {
5958
uv_mutex_destroy(&m_connection_lock);
60-
if (m_gconfig) {
61-
delete m_gconfig;
62-
}
6359

6460
if (m_tconfig) {
6561
delete m_tconfig;
6662
}
63+
64+
if (m_gconfig) {
65+
delete m_gconfig;
66+
}
6767
}
6868

6969
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
@@ -78,17 +78,6 @@ bool Connection::IsConnected() {
7878
return !m_is_closing && m_client != NULL;
7979
}
8080

81-
void Connection::DumpConfig(std::list<std::string> *dump) {
82-
for (std::list<std::string>::iterator it = dump->begin();
83-
it != dump->end(); ) {
84-
std::cout << *it << " = ";
85-
it++;
86-
std::cout << *it << std::endl;
87-
it++;
88-
}
89-
std::cout << std::endl;
90-
}
91-
9281
RdKafka::Handle* Connection::GetClient() {
9382
return m_client;
9483
}

src/connection.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <nan.h>
1414
#include <iostream>
1515
#include <string>
16-
#include <list>
1716

1817
#include "deps/librdkafka/src-cpp/rdkafkacpp.h"
1918

@@ -67,8 +66,6 @@ class Connection : public Nan::ObjectWrap {
6766
Connection(RdKafka::Conf*, RdKafka::Conf*);
6867
~Connection();
6968

70-
void DumpConfig(std::list<std::string> *);
71-
7269
static Nan::Persistent<v8::Function> constructor;
7370
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
7471

src/consumer.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ Baton Consumer::Disconnect() {
9595

9696
delete m_client;
9797
m_client = NULL;
98-
99-
RdKafka::wait_destroyed(1000);
10098
}
10199
}
102100

src/producer.cc

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
3434
m_partitioner_cb() {
3535
std::string errstr;
3636

37+
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
3738
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
3839
}
3940

@@ -124,7 +125,7 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
124125
return Nan::ThrowError(errstr.c_str());
125126
}
126127

127-
Producer* producer = new Producer(gconfig, gconfig);
128+
Producer* producer = new Producer(gconfig, tconfig);
128129

129130
// Wrap it
130131
producer->Wrap(info.This());
@@ -190,32 +191,13 @@ void Producer::Disconnect() {
190191
delete m_client;
191192
m_client = NULL;
192193
}
193-
194-
RdKafka::wait_destroyed(1000);
195194
}
196195

197196
Baton Producer::Produce(ProducerMessage* msg) {
198197
return Produce(msg->Payload(), msg->Size(), msg->GetTopic(),
199198
msg->partition, msg->key);
200199
}
201200

202-
Baton Producer::Produce(void* message, size_t size, std::string topic_name,
203-
int32_t partition, std::string *key) {
204-
std::string errstr;
205-
206-
RdKafka::Topic* topic =
207-
RdKafka::Topic::create(m_client, topic_name, m_tconfig, errstr);
208-
209-
if (errstr.empty()) {
210-
// Cede ownership of the pointer to this function
211-
return Produce(message, size, topic, partition, key);
212-
}
213-
214-
// We own the pointer here so we need to free it
215-
free(message);
216-
return Baton(RdKafka::ERR__INVALID_ARG);
217-
}
218-
219201
Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic,
220202
int32_t partition, std::string *key) {
221203
RdKafka::ErrorCode response_code;

src/producer.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class Producer : public Connection {
5656
void Poll();
5757

5858
Baton Produce(ProducerMessage* msg);
59-
Baton Produce(void*, size_t, std::string, int32_t, std::string*);
6059
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
6160
std::string Name();
6261

0 commit comments

Comments
 (0)