From ea7463d5296e14e667237b7c977f24a445481761 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Wed, 21 Aug 2024 13:31:13 -0400 Subject: [PATCH] adding UT Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/mqb/mqbblp/mqbblp_pushstream.cpp | 12 + src/groups/mqb/mqbblp/mqbblp_pushstream.h | 10 + src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp | 214 ++++++++++++++++++ .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 2 +- 4 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp index 59db176ed..dc849e937 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.cpp @@ -263,6 +263,18 @@ VirtualPushStreamIterator::~VirtualPushStreamIterator() // NOTHING } +unsigned int VirtualPushStreamIterator::numApps() const +{ + BSLS_ASSERT_SAFE(!atEnd()); + + return 1; +} + +void VirtualPushStreamIterator::removeCurrentElement() +{ + advance(); +} + // MANIPULATORS bool VirtualPushStreamIterator::advance() { diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.h b/src/groups/mqb/mqbblp/mqbblp_pushstream.h index e01520df3..5bbdb30b5 100644 --- a/src/groups/mqb/mqbblp/mqbblp_pushstream.h +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.h @@ -388,6 +388,16 @@ class VirtualPushStreamIterator : public PushStreamIterator { /// Destructor virtual ~VirtualPushStreamIterator() BSLS_KEYWORD_OVERRIDE; + /// Remove the current element (`mqbi::AppMessage`, `upstreamSubQueueId` + /// pair) from the current PUSH GUID. + /// The behavior is undefined unless `atEnd` returns `false`. + void removeCurrentElement(); + + /// Return the number of elements (`mqbi::AppMessage`, `upstreamSubQueueId` + /// pairs) for the current PUSH GUID. + /// The behavior is undefined unless `atEnd` returns `false`. + unsigned int numApps() const; + /// Return the current element (`mqbi::AppMessage`, `upstreamSubQueueId` /// pair). /// The behavior is undefined unless `atEnd` returns `false`. diff --git a/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp new file mode 100644 index 000000000..19f09b24f --- /dev/null +++ b/src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp @@ -0,0 +1,214 @@ +// Copyright 2024 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbblp_pushstream.t.cpp -*-C++-*- +#include + +// BMQ +#include + +#include +#include + +#include + +// TEST DRIVER +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- + +static void test1_basic() +{ + mwctst::TestHelper::printTestName("PushStream basic test"); + + bdlma::ConcurrentPool pushElementsPool(sizeof(mqbblp::PushStream::Element), + s_allocator_p); + + mqbblp::PushStream ps(&pushElementsPool, s_allocator_p); + unsigned int subQueueId = 0; + bsl::shared_ptr app; // unused + bmqp::SubQueueInfo subscription; + + mqbblp::PushStream::iterator itGuid = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp = + ps.d_apps.emplace(subQueueId, app).first; + + mqbblp::PushStream::Element* element = ps.create(subscription, + itGuid, + itApp); + + ps.add(element); + ps.remove(element); + ps.destroy(element, false); +} + +static void test2_iterations() +{ + mwctst::TestHelper::printTestName("PushStream basic test"); + + // Imitate {m1, a1}, {m2, a2}, {m1, a2}, {m2, a1} + + mqbblp::PushStream ps(bsl::nullptr_t(), s_allocator_p); + unsigned int subQueueId1 = 1; + unsigned int subQueueId2 = 2; + + bsl::shared_ptr unused; + + bmqp::SubQueueInfo subscription1(1); + bmqp::SubQueueInfo subscription2(2); + + mqbblp::PushStream::iterator itGuid1 = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp1 = + ps.d_apps.emplace(subQueueId1, unused).first; + + mqbblp::PushStream::Element* element1 = ps.create(subscription1, + itGuid1, + itApp1); + + ps.add(element1); + + mqbblp::PushStream::iterator itGuid2 = ps.findOrAppendMessage( + bmqp::MessageGUIDGenerator::testGUID()); + + mqbblp::PushStream::Apps::iterator itApp2 = + ps.d_apps.emplace(subQueueId2, unused).first; + + mqbblp::PushStream::Element* element2 = ps.create(subscription2, + itGuid2, + itApp2); + + ps.add(element2); + + mqbblp::PushStream::Element* element3 = ps.create(subscription2, + itGuid1, + itApp2); + + ps.add(element3); + + mqbblp::PushStream::Element* element4 = ps.create(subscription1, + itGuid2, + itApp1); + + ps.add(element4); + + mqbu::CapacityMeter dummyCapacityMeter("dummy", s_allocator_p); + bmqt::Uri dummyUri("dummy", s_allocator_p); + mqbconfm::Domain dummyDomain(s_allocator_p); + + mqbs::InMemoryStorage dummyStorage(dummyUri, + mqbu::StorageKey::k_NULL_KEY, + mqbs::DataStore::k_INVALID_PARTITION_ID, + dummyDomain, + &dummyCapacityMeter, + s_allocator_p); + + mqbconfm::Storage config; + mqbconfm::Limits limits; + + config.makeInMemory(); + + limits.messages() = bsl::numeric_limits::max(); + limits.bytes() = bsl::numeric_limits::max(); + + mwcu::MemOutStream errorDescription(s_allocator_p); + dummyStorage.configure(errorDescription, + config, + limits, + bsl::numeric_limits::max(), + 0); + + { + mqbblp::PushStreamIterator pit(&dummyStorage, + &ps, + ps.d_stream.begin()); + + ASSERT(!pit.atEnd()); + ASSERT_EQ(pit.numApps(), 2); + + ASSERT_EQ(element1, pit.element(0)); + ASSERT_EQ(element3, pit.element(1)); + + ASSERT(pit.advance()); + ASSERT_EQ(pit.numApps(), 2); + + ASSERT_EQ(element2, pit.element(0)); + ASSERT_EQ(element4, pit.element(1)); + + ASSERT(!pit.advance()); + } + + { + mqbblp::VirtualPushStreamIterator vit(subQueueId1, + &dummyStorage, + &ps, + ps.d_stream.begin()); + + ASSERT(!vit.atEnd()); + ASSERT_EQ(vit.numApps(), 1); + + ASSERT_EQ(element1, vit.element(0)); + + vit.advance(); + + ASSERT(!vit.atEnd()); + ASSERT_EQ(vit.numApps(), 1); + + ASSERT_EQ(element4, vit.element(0)); + + vit.advance(); + + ASSERT(vit.atEnd()); + } + + ps.remove(element2); + ps.destroy(element2, false); + ps.remove(element3); + ps.destroy(element3, false); +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + TEST_PROLOG(mwctst::TestHelper::e_DEFAULT); + + bmqt::UriParser::initialize(s_allocator_p); + + switch (_testCase) { + case 0: + case 1: test1_basic(); break; + case 2: test2_iterations(); break; + default: { + cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; + s_testStatus = -1; + } break; + } + + bmqt::UriParser::shutdown(); + + TEST_EPILOG(mwctst::TestHelper::e_CHECK_DEF_GBL_ALLOC); +} diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 1764c4e4c..d06d9ef60 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -1519,7 +1519,7 @@ void RelayQueueEngine::afterQueuePurged(const bsl::string& appId, // Purge all virtual storages, and reset all iterators. - unsigned int numMessages = d_pushStream.removeAll(); + d_pushStream.removeAll(); for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { it->second->clear();