diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index 36bad9214fb..416b8cbac9b 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -53,6 +53,13 @@ class MPMCQueue { } + ~MPMCQueue() + { + std::unique_lock lock(mu); + for (; read_pos < write_pos; ++read_pos) + destruct(getObj(read_pos)); + } + /// Block util: /// 1. Pop succeeds with a valid T: return true. /// 2. The queue is cancelled or finished: return false. diff --git a/dbms/src/Common/tests/gtest_mpmc_queue.cpp b/dbms/src/Common/tests/gtest_mpmc_queue.cpp index 7e9602c9bbe..264edea5f80 100644 --- a/dbms/src/Common/tests/gtest_mpmc_queue.cpp +++ b/dbms/src/Common/tests/gtest_mpmc_queue.cpp @@ -8,11 +8,11 @@ #include #include -namespace DB +namespace DB::tests { -namespace tests +namespace { -class TestMPMCQueue : public ::testing::Test +class MPMCQueueTest : public ::testing::Test { protected: std::random_device rd; @@ -489,28 +489,28 @@ class TestMPMCQueue : public ::testing::Test }; template <> -struct TestMPMCQueue::ValueHelper +struct MPMCQueueTest::ValueHelper { static int make(int v) { return v; } static int extract(int v) { return v; } }; template <> -struct TestMPMCQueue::ValueHelper> +struct MPMCQueueTest::ValueHelper> { static std::unique_ptr make(int v) { return std::make_unique(v); } static int extract(std::unique_ptr & v) { return *v; } }; template <> -struct TestMPMCQueue::ValueHelper> +struct MPMCQueueTest::ValueHelper> { static std::shared_ptr make(int v) { return std::make_shared(v); } static int extract(std::shared_ptr & v) { return *v; } }; #define ADD_TEST_FOR(type_name, type, test_name, ...) \ - TEST_F(TestMPMCQueue, type_name##_##test_name) \ + TEST_F(MPMCQueueTest, type_name##_##test_name) \ try \ { \ test##test_name(__VA_ARGS__); \ @@ -533,7 +533,7 @@ ADD_TEST(CancelEmpty, 4, 4); ADD_TEST(CancelConcurrentPop, 4); ADD_TEST(CancelConcurrentPush, 4); -TEST_F(TestMPMCQueue, ExceptionSafe) +TEST_F(MPMCQueueTest, ExceptionSafe) try { MPMCQueue queue(10); @@ -590,5 +590,74 @@ try } CATCH -} // namespace tests -} // namespace DB +TEST_F(MPMCQueueTest, isNextOpNonBlocking) +try +{ + MPMCQueue q(2); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_FALSE(q.isNextPopNonBlocking()); + ASSERT_TRUE(q.push(1)); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_TRUE(q.isNextPopNonBlocking()); + int val; + ASSERT_TRUE(q.pop(val)); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_FALSE(q.isNextPopNonBlocking()); + ASSERT_TRUE(q.push(1)); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_TRUE(q.isNextPopNonBlocking()); + ASSERT_TRUE(q.push(1)); + ASSERT_FALSE(q.isNextPushNonBlocking()); + ASSERT_TRUE(q.isNextPopNonBlocking()); + + ASSERT_TRUE(q.finish()); + ASSERT_FALSE(q.finish()); + + //check isNextPush/PopNonBlocking after finish + ASSERT_TRUE(q.pop(val)); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_TRUE(q.isNextPopNonBlocking()); + ASSERT_TRUE(q.pop(val)); + ASSERT_TRUE(q.isNextPushNonBlocking()); + ASSERT_TRUE(q.isNextPopNonBlocking()); +} +CATCH + +struct Counter +{ + static int count; + Counter() + { + ++count; + } + + ~Counter() + { + --count; + } +}; +int Counter::count = 0; + +TEST_F(MPMCQueueTest, objectsDestructed) +try +{ + { + MPMCQueue queue(100); + queue.emplace(); + ASSERT_EQ(Counter::count, 1); + + { + Counter cnt; + queue.pop(cnt); + } + ASSERT_EQ(Counter::count, 0); + + queue.emplace(); + ASSERT_EQ(Counter::count, 1); + } + ASSERT_EQ(Counter::count, 0); +} +CATCH + +} // namespace +} // namespace DB::tests