Skip to content

Commit 1707cbd

Browse files
committed
Fix memory leak when fetching metadata for a single topic
When requesting metadata for a single topic, Connection::GetMetadata() calls Connection::CreateTopic() to resolve the provided topic name into a Topic, but fails to deallocate it. To reproduce, compile node-rdkafka and librdkafka with ASAN, then run the following: ```js const { KafkaConsumer } = require('.'); const consumer = new KafkaConsumer({ 'group.id': 'kafka', 'metadata.broker.list': 'localhost:9092', }, {}); consumer.connect({ timeout: 2000 }, function (err) { if (err) { console.error('Error connecting to Kafka:', err); return; } consumer.getMetadata({ topic: 'test' }, function (metadataErr, metadata) { if (metadataErr) { console.error('Error fetching metadata:', metadataErr); } else { console.log(`Metadata: ${JSON.stringify(metadata, null, 2)}`); } consumer.disconnect(); }); }) ``` ASAN will report a leak from GetMetadata(): ``` Indirect leak of 1048 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa037 in __interceptor_calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:154 Blizzard#1 0x7f9dab530394 in rd_calloc /node-rdkafka/deps/librdkafka/src/rd.h:134 Blizzard#2 0x7f9dab530394 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:349 Blizzard#3 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#4 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#5 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#6 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#7 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#8 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#9 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#10 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#11 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 128 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fa1f8 in __interceptor_realloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:164 Blizzard#1 0x7f9dab6bf9eb in rd_realloc /node-rdkafka/deps/librdkafka/src/rd.h:146 Blizzard#2 0x7f9dab6bf9eb in rd_list_grow /node-rdkafka/deps/librdkafka/src/rdlist.c:49 Blizzard#3 0x7f9dab6bfa9f in rd_list_init /node-rdkafka/deps/librdkafka/src/rdlist.c:57 Blizzard#4 0x7f9dab530dd7 in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:478 Blizzard#5 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#6 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 32 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63fb647 in operator new(unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:99 Blizzard#1 0x7f9dd1f47743 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:84 Blizzard#2 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#3 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#4 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#5 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#6 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#7 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#8 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 23 byte(s) in 1 object(s) allocated from: #0 0x7f9dd63f9e8f in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:145 Blizzard#1 0x7f9dab5303ea in rd_malloc /node-rdkafka/deps/librdkafka/src/rd.h:140 Blizzard#2 0x7f9dab5303ea in rd_kafkap_str_new /node-rdkafka/deps/librdkafka/src/rdkafka_proto.h:315 Blizzard#3 0x7f9dab5303ea in rd_kafka_topic_new0 /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:353 Blizzard#4 0x7f9dab534cbc in rd_kafka_topic_new /node-rdkafka/deps/librdkafka/src/rdkafka_topic.c:533 Blizzard#5 0x7f9dd1f47891 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:114 Blizzard#6 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#7 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#8 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#9 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#10 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#11 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#12 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) Indirect leak of 20 byte(s) in 2 object(s) allocated from: #0 0x7f9dd63a7817 in __interceptor_strdup ../../../../src/libsanitizer/asan/asan_interceptors.cpp:452 Blizzard#1 0x7f9dab537302 in rd_strdup /node-rdkafka/deps/librdkafka/src/rd.h:157 Blizzard#2 0x7f9dab537302 in rd_kafka_anyconf_set_prop0 /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:1827 Blizzard#3 0x7f9dab537c63 in rd_kafka_defaultconf_set /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2273 Blizzard#4 0x7f9dab5394fd in rd_kafka_topic_conf_new /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2293 Blizzard#5 0x7f9dab539e9f in rd_kafka_topic_conf_dup /node-rdkafka/deps/librdkafka/src/rdkafka_conf.c:2725 Blizzard#6 0x7f9dd1f4794f in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, RdKafka::Conf const*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&) /node-rdkafka/deps/librdkafka/src-cpp/TopicImpl.cpp:89 Blizzard#7 0x7f9dabdc8eb9 in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, RdKafka::Conf*) ../src/connection.cc:115 Blizzard#8 0x7f9dabdc94db in NodeKafka::Connection::CreateTopic(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) ../src/connection.cc:104 Blizzard#9 0x7f9dabdca0d9 in NodeKafka::Connection::GetMetadata(bool, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, int) ../src/connection.cc:198 Blizzard#10 0x7f9dabe63bf2 in NodeKafka::Workers::ConnectionMetadata::Execute() ../src/workers.cc:95 Blizzard#11 0x7f9dabdd7261 in Nan::AsyncExecute(uv_work_s*) ../node_modules/nan/nan.h:2356 Blizzard#12 0x18bb06f in worker ../deps/uv/src/threadpool.c:122 Blizzard#13 0x7f9dd5ffbea6 in start_thread (/lib/x86_64-linux-gnu/libpthread.so.0+0x7ea6) ``` The main issue seems to be that `Baton` does not take ownership of pointers it receives, requiring callers to manually dispose of the data on an ad-hoc basis. So, introduce a new typed RAII wrapper class suitable for wrapping the results of a librdkafka operation, and convert CreateTopic() to return it instead. As a potential followup, other methods that currently return a `Baton` could also be incrementally migrated to the new wrapper to reduce the amount of manual memory management required.
1 parent 24e6e0c commit 1707cbd

File tree

6 files changed

+85
-23
lines changed

6 files changed

+85
-23
lines changed

src/connection.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <vector>
1313

1414
#include "src/connection.h"
15+
#include "kafka-operation-result.h"
1516
#include "src/workers.h"
1617

1718
using RdKafka::Conf;
@@ -100,11 +101,11 @@ RdKafka::Handle* Connection::GetClient() {
100101
return m_client;
101102
}
102103

103-
Baton Connection::CreateTopic(std::string topic_name) {
104+
KafkaOperationResult<RdKafka::Topic> Connection::CreateTopic(std::string topic_name) {
104105
return CreateTopic(topic_name, NULL);
105106
}
106107

107-
Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
108+
KafkaOperationResult<RdKafka::Topic> Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
108109
std::string errstr;
109110

110111
RdKafka::Topic* topic = NULL;
@@ -114,19 +115,19 @@ Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
114115
if (IsConnected()) {
115116
topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr);
116117
} else {
117-
return Baton(RdKafka::ErrorCode::ERR__STATE);
118+
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR__STATE);
118119
}
119120
} else {
120-
return Baton(RdKafka::ErrorCode::ERR__STATE);
121+
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR__STATE);
121122
}
122123

123124
if (!errstr.empty()) {
124-
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
125+
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
125126
}
126127

127128
// Maybe do it this way later? Then we don't need to do static_cast
128129
// <RdKafka::Topic*>
129-
return Baton(topic);
130+
return KafkaOperationResult<RdKafka::Topic>(topic);
130131
}
131132

132133
Baton Connection::QueryWatermarkOffsets(
@@ -189,15 +190,15 @@ Baton Connection::OffsetsForTimes(
189190

190191
Baton Connection::GetMetadata(
191192
bool all_topics, std::string topic_name, int timeout_ms) {
192-
RdKafka::Topic* topic = NULL;
193+
std::unique_ptr<RdKafka::Topic> topic{};
193194
RdKafka::ErrorCode err;
194195

195196
std::string errstr;
196197

197198
if (!topic_name.empty()) {
198-
Baton b = CreateTopic(topic_name);
199+
KafkaOperationResult<RdKafka::Topic> b = CreateTopic(topic_name);
199200
if (b.err() == RdKafka::ErrorCode::ERR_NO_ERROR) {
200-
topic = b.data<RdKafka::Topic*>();
201+
topic = b.take_ownership();
201202
}
202203
}
203204

@@ -211,7 +212,7 @@ Baton Connection::GetMetadata(
211212
scoped_shared_read_lock lock(m_connection_lock);
212213
if (IsConnected()) {
213214
// Always send true - we
214-
err = m_client->metadata(all_topics, topic, &metadata, timeout_ms);
215+
err = m_client->metadata(all_topics, topic.get(), &metadata, timeout_ms);
215216
} else {
216217
err = RdKafka::ERR__STATE;
217218
}

src/connection.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "src/errors.h"
2222
#include "src/config.h"
2323
#include "src/callbacks.h"
24+
#include "src/kafka-operation-result.h"
2425

2526
namespace NodeKafka {
2627

@@ -56,8 +57,8 @@ class Connection : public Nan::ObjectWrap {
5657
bool IsClosing();
5758

5859
// Baton<RdKafka::Topic*>
59-
Baton CreateTopic(std::string);
60-
Baton CreateTopic(std::string, RdKafka::Conf*);
60+
KafkaOperationResult<RdKafka::Topic> CreateTopic(std::string);
61+
KafkaOperationResult<RdKafka::Topic> CreateTopic(std::string, RdKafka::Conf*);
6162
Baton GetMetadata(bool, std::string, int);
6263
Baton QueryWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*, int);
6364
Baton OffsetsForTimes(std::vector<RdKafka::TopicPartition*> &, int);

src/kafka-operation-result.h

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#ifndef SRC_KAFKA_OPERATION_RESULT_H_
2+
#define SRC_KAFKA_OPERATION_RESULT_H_
3+
4+
#include <cassert>
5+
#include <memory>
6+
#include <string>
7+
8+
#include "rdkafkacpp.h"
9+
10+
namespace NodeKafka {
11+
/**
12+
* Type-safe wrapper for the result of an RDKafka library operation.
13+
*/
14+
template<typename T>
15+
class KafkaOperationResult {
16+
public:
17+
/**
18+
* Constructor for a successful operation result.
19+
* Takes ownership of the data pointer.
20+
*/
21+
explicit KafkaOperationResult(T* data)
22+
: m_data(data), m_err(RdKafka::ErrorCode::ERR_NO_ERROR) {}
23+
explicit KafkaOperationResult(RdKafka::ErrorCode err)
24+
: m_data(nullptr), m_err(err) {}
25+
explicit KafkaOperationResult(RdKafka::ErrorCode err, std::string errstr)
26+
: m_data(nullptr), m_err(err), m_errstr(errstr) {}
27+
28+
/**
29+
* Get a non-owning pointer to the result data.
30+
* Only should be called for non-error results.
31+
*/
32+
T* data() const {
33+
assert(m_data != nullptr);
34+
return m_data.get();
35+
}
36+
37+
/**
38+
* Transfer ownership of the result data to the caller.
39+
* Only should be called for non-error results.
40+
*/
41+
std::unique_ptr<T> take_ownership() {
42+
assert(m_data != nullptr);
43+
std::unique_ptr<T> data = std::move(m_data);
44+
m_data.reset();
45+
return data;
46+
}
47+
48+
RdKafka::ErrorCode err() const {
49+
return m_err;
50+
}
51+
52+
std::string errstr() const {
53+
return m_errstr.empty() ? RdKafka::err2str(m_err) : m_errstr;
54+
}
55+
56+
private:
57+
std::unique_ptr<T> m_data;
58+
RdKafka::ErrorCode m_err;
59+
std::string m_errstr;
60+
};
61+
} // namespace NodeKafka
62+
63+
#endif // SRC_KAFKA_OPERATION_RESULT_H_

src/producer.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "src/producer.h"
1414
#include "src/kafka-consumer.h"
15+
#include "src/kafka-operation-result.h"
1516
#include "src/workers.h"
1617

1718
namespace NodeKafka {
@@ -617,22 +618,17 @@ NAN_METHOD(Producer::NodeProduce) {
617618
Topic* topic = ObjectWrap::Unwrap<Topic>(info[0].As<v8::Object>());
618619

619620
// Unwrap it and turn it into an RdKafka::Topic*
620-
Baton topic_baton = topic->toRDKafkaTopic(producer);
621+
KafkaOperationResult<RdKafka::Topic> topic_result = topic->toRDKafkaTopic(producer);
621622

622-
if (topic_baton.err() != RdKafka::ERR_NO_ERROR) {
623+
if (topic_result.err() != RdKafka::ERR_NO_ERROR) {
623624
// Let the JS library throw if we need to so the error can be more rich
624-
error_code = static_cast<int>(topic_baton.err());
625+
error_code = static_cast<int>(topic_result.err());
625626

626627
return info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
627628
}
628629

629-
RdKafka::Topic* rd_topic = topic_baton.data<RdKafka::Topic*>();
630-
631630
Baton b = producer->Produce(message_buffer_data, message_buffer_length,
632-
rd_topic, partition, key_buffer_data, key_buffer_length, opaque);
633-
634-
// Delete the topic when we are done.
635-
delete rd_topic;
631+
topic_result.data(), partition, key_buffer_data, key_buffer_length, opaque);
636632

637633
error_code = static_cast<int>(b.err());
638634
}

src/topic.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ std::string Topic::name() {
4545
return m_topic_name;
4646
}
4747

48-
Baton Topic::toRDKafkaTopic(Connection* handle) {
48+
KafkaOperationResult<RdKafka::Topic> Topic::toRDKafkaTopic(Connection* handle) {
4949
if (m_config) {
5050
return handle->CreateTopic(m_topic_name, m_config);
5151
} else {

src/topic.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "rdkafkacpp.h"
1717

1818
#include "src/config.h"
19+
#include "src/kafka-operation-result.h"
1920

2021
namespace NodeKafka {
2122

@@ -24,7 +25,7 @@ class Topic : public Nan::ObjectWrap {
2425
static void Init(v8::Local<v8::Object>);
2526
static v8::Local<v8::Object> NewInstance(v8::Local<v8::Value> arg);
2627

27-
Baton toRDKafkaTopic(Connection *handle);
28+
KafkaOperationResult<RdKafka::Topic> toRDKafkaTopic(Connection *handle);
2829

2930
protected:
3031
static Nan::Persistent<v8::Function> constructor;

0 commit comments

Comments
 (0)