Skip to content

Commit c7c7cbe

Browse files
committed
Fix GC issue
1 parent 25e8af2 commit c7c7cbe

File tree

7 files changed

+19
-39
lines changed

7 files changed

+19
-39
lines changed

src/config.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ namespace NodeKafka {
2525

2626
namespace Config {
2727

28+
void DumpConfig(std::list<std::string> *dump) {
29+
for (std::list<std::string>::iterator it = dump->begin();
30+
it != dump->end(); ) {
31+
std::cout << *it << " = ";
32+
it++;
33+
std::cout << *it << std::endl;
34+
it++;
35+
}
36+
std::cout << std::endl;
37+
}
38+
2839
template<typename T>
2940
void LoadParameter(v8::Local<v8::Object> object, std::string field, const T &to) { // NOLINT
3041
to = GetParameter<T>(object, field, to);

src/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
namespace NodeKafka {
2121
namespace Config {
2222

23+
void DumpConfig(std::list<std::string> *);
2324
template<typename T> void LoadParameter(v8::Local<v8::Object>, std::string, T &); // NOLINT
2425
std::string GetValue(RdKafka::Conf*, const std::string);
2526
RdKafka::Conf* Create(RdKafka::Conf::ConfType, v8::Local<v8::Object>, std::string &); // NOLINT

src/connection.cc

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ Connection::Connection(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
5757

5858
Connection::~Connection() {
5959
uv_mutex_destroy(&m_connection_lock);
60-
if (m_gconfig) {
61-
delete m_gconfig;
62-
}
6360

6461
if (m_tconfig) {
6562
delete m_tconfig;
6663
}
64+
65+
if (m_gconfig) {
66+
delete m_gconfig;
67+
}
68+
6769
}
6870

6971
RdKafka::TopicPartition* Connection::GetPartition(std::string &topic) {
@@ -78,17 +80,6 @@ bool Connection::IsConnected() {
7880
return !m_is_closing && m_client != NULL;
7981
}
8082

81-
void Connection::DumpConfig(std::list<std::string> *dump) {
82-
for (std::list<std::string>::iterator it = dump->begin();
83-
it != dump->end(); ) {
84-
std::cout << *it << " = ";
85-
it++;
86-
std::cout << *it << std::endl;
87-
it++;
88-
}
89-
std::cout << std::endl;
90-
}
91-
9283
RdKafka::Handle* Connection::GetClient() {
9384
return m_client;
9485
}

src/connection.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ class Connection : public Nan::ObjectWrap {
6767
Connection(RdKafka::Conf*, RdKafka::Conf*);
6868
~Connection();
6969

70-
void DumpConfig(std::list<std::string> *);
71-
7270
static Nan::Persistent<v8::Function> constructor;
7371
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
7472

src/consumer.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ Baton Consumer::Disconnect() {
9595

9696
delete m_client;
9797
m_client = NULL;
98-
99-
RdKafka::wait_destroyed(1000);
10098
}
10199
}
102100

src/producer.cc

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Producer::Producer(RdKafka::Conf* gconfig, RdKafka::Conf* tconfig):
3434
m_partitioner_cb() {
3535
std::string errstr;
3636

37+
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
3738
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
3839
}
3940

@@ -124,7 +125,7 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
124125
return Nan::ThrowError(errstr.c_str());
125126
}
126127

127-
Producer* producer = new Producer(gconfig, gconfig);
128+
Producer* producer = new Producer(gconfig, tconfig);
128129

129130
// Wrap it
130131
producer->Wrap(info.This());
@@ -190,32 +191,13 @@ void Producer::Disconnect() {
190191
delete m_client;
191192
m_client = NULL;
192193
}
193-
194-
RdKafka::wait_destroyed(1000);
195194
}
196195

197196
Baton Producer::Produce(ProducerMessage* msg) {
198197
return Produce(msg->Payload(), msg->Size(), msg->GetTopic(),
199198
msg->partition, msg->key);
200199
}
201200

202-
Baton Producer::Produce(void* message, size_t size, std::string topic_name,
203-
int32_t partition, std::string *key) {
204-
std::string errstr;
205-
206-
RdKafka::Topic* topic =
207-
RdKafka::Topic::create(m_client, topic_name, m_tconfig, errstr);
208-
209-
if (errstr.empty()) {
210-
// Cede ownership of the pointer to this function
211-
return Produce(message, size, topic, partition, key);
212-
}
213-
214-
// We own the pointer here so we need to free it
215-
free(message);
216-
return Baton(RdKafka::ERR__INVALID_ARG);
217-
}
218-
219201
Baton Producer::Produce(void* message, size_t size, RdKafka::Topic* topic,
220202
int32_t partition, std::string *key) {
221203
RdKafka::ErrorCode response_code;

src/producer.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class Producer : public Connection {
5656
void Poll();
5757

5858
Baton Produce(ProducerMessage* msg);
59-
Baton Produce(void*, size_t, std::string, int32_t, std::string*);
6059
Baton Produce(void*, size_t, RdKafka::Topic*, int32_t, std::string*);
6160
std::string Name();
6261

0 commit comments

Comments
 (0)