Skip to content

Commit

Permalink
*: Fix memory leak when MPMCQueue destruct with non-poped elements (#…
Browse files Browse the repository at this point in the history
…4221)

close #4098
  • Loading branch information
fuzhe1989 authored Mar 11, 2022
1 parent e8fae73 commit f50e1a9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ class MPMCQueue
{
}

~MPMCQueue()
{
std::unique_lock lock(mu);
for (; read_pos < write_pos; ++read_pos)
destruct(getObj(read_pos));
}

/// Block util:
/// 1. Pop succeeds with a valid T: return true.
/// 2. The queue is cancelled or finished: return false.
Expand Down
61 changes: 49 additions & 12 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include <thread>
#include <vector>

namespace DB
namespace DB::tests
{
namespace tests
namespace
{
class TestMPMCQueue : public ::testing::Test
class MPMCQueueTest : public ::testing::Test
{
protected:
std::random_device rd;
Expand Down Expand Up @@ -489,28 +489,28 @@ class TestMPMCQueue : public ::testing::Test
};

template <>
struct TestMPMCQueue::ValueHelper<int>
struct MPMCQueueTest::ValueHelper<int>
{
static int make(int v) { return v; }
static int extract(int v) { return v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::unique_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::unique_ptr<int>>
{
static std::unique_ptr<int> make(int v) { return std::make_unique<int>(v); }
static int extract(std::unique_ptr<int> & v) { return *v; }
};

template <>
struct TestMPMCQueue::ValueHelper<std::shared_ptr<int>>
struct MPMCQueueTest::ValueHelper<std::shared_ptr<int>>
{
static std::shared_ptr<int> make(int v) { return std::make_shared<int>(v); }
static int extract(std::shared_ptr<int> & v) { return *v; }
};

#define ADD_TEST_FOR(type_name, type, test_name, ...) \
TEST_F(TestMPMCQueue, type_name##_##test_name) \
TEST_F(MPMCQueueTest, type_name##_##test_name) \
try \
{ \
test##test_name<type>(__VA_ARGS__); \
Expand All @@ -533,7 +533,7 @@ ADD_TEST(CancelEmpty, 4, 4);
ADD_TEST(CancelConcurrentPop, 4);
ADD_TEST(CancelConcurrentPush, 4);

TEST_F(TestMPMCQueue, ExceptionSafe)
TEST_F(MPMCQueueTest, ExceptionSafe)
try
{
MPMCQueue<ThrowInjectable> queue(10);
Expand Down Expand Up @@ -590,8 +590,8 @@ try
}
CATCH


TEST_F(TestMPMCQueue, isNextOpNonBlocking)
TEST_F(MPMCQueueTest, isNextOpNonBlocking)
try
{
MPMCQueue<int> q(2);
ASSERT_TRUE(q.isNextPushNonBlocking());
Expand Down Expand Up @@ -621,6 +621,43 @@ TEST_F(TestMPMCQueue, isNextOpNonBlocking)
ASSERT_TRUE(q.isNextPushNonBlocking());
ASSERT_TRUE(q.isNextPopNonBlocking());
}
CATCH

struct Counter
{
static int count;
Counter()
{
++count;
}

~Counter()
{
--count;
}
};
int Counter::count = 0;

TEST_F(MPMCQueueTest, objectsDestructed)
try
{
{
MPMCQueue<Counter> queue(100);
queue.emplace();
ASSERT_EQ(Counter::count, 1);

{
Counter cnt;
queue.pop(cnt);
}
ASSERT_EQ(Counter::count, 0);

queue.emplace();
ASSERT_EQ(Counter::count, 1);
}
ASSERT_EQ(Counter::count, 0);
}
CATCH

} // namespace tests
} // namespace DB
} // namespace
} // namespace DB::tests

0 comments on commit f50e1a9

Please sign in to comment.