-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat[MQB]: Monolithic Virtual Storage #334
base: main
Are you sure you want to change the base?
Conversation
9273dab
to
db1187f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some surface level comments
resources.d_blobSpPool_p = &d_blobSpPool; | ||
resources.d_bufferFactory_p = &d_bufferFactory; | ||
resources.d_scheduler_p = d_scheduler_p; | ||
resources.d_pushElementsPool_p = &d_pushElementsPool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about passing these fields to the constructor instead? My concern is about adding a new field in resources, with a field access like here it's easy to forget a new field somewhere. With a constructor extended with a new field, the compiler will point to all places where a new field is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that was the old way and the constructor signature kept growing. I thought, it would help to encapsulate everything in Resources
so when we need something new from mqba::Application
far away from it, we don't have to change signatures across the entire chain of constructors.
The concern about missing field is addressed by the fact that if a component needs the field, the compilation will fail.
The concern about missing field value can be addressed by an assert
bmqp::SubQueueInfo(bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID)); | ||
} | ||
else { | ||
// Assume, RelayQueueEngien will use upstreramSubQueueIds as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
// Assume, RelayQueueEngien will use upstreramSubQueueIds as the | |
// Assume, RelayQueueEngine will use upstreramSubQueueIds as the |
// d_rdaInfo, | ||
// bmqp::Protocol::k_DEFAULT_SUBSCRIPTION_ID, | ||
// mqbu::StorageKey::k_NULL_KEY); | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is not needed anymore, right?
@@ -397,6 +404,14 @@ void QueueEngineUtil::logRejectMessage( | |||
<< "deleted." << MWCTSK_ALARMLOG_END; | |||
} | |||
|
|||
bsls::Types::Int64 QueueEngineUtil::getMessageQueueTime( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my previous PR, I turned this getMessageQueueTime
to a local function, because it's not used outside the QueueEngineUtil
cpp file. If it's still not used outside, let's keep it hidden from QueueEngineUtil
interface.
allocator) | ||
, d_iterator(d_storage.getIterator(mqbu::StorageKey())) | ||
, d_bufferFactory(32, allocator) | ||
, d_queue_sp(new (*allocator) mqbmock::Queue(0, allocator), allocator) | ||
, d_queue_sp(new(*allocator) mqbmock::Queue(0, allocator), allocator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a bug in the local clang-format
, d_queue_sp(new(*allocator) mqbmock::Queue(0, allocator), allocator) | |
, d_queue_sp(new (*allocator) mqbmock::Queue(0, allocator), allocator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clang-format does without the space but the C++ Formatting Check does not pass so I had to manually insert the space.
TEST_PROLOG(mwctst::TestHelper::e_DEFAULT); | ||
|
||
// bslma::DefaultAllocatorGuard _defAllocGuard(&ma); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file are not needed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
db1187f
to
5a63af8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall comments:
- The logic for
RelayQueueEngine
becomes much clearer with this refactor. Even more so than in making way for reliable broadcast, I find this to be the most important change in this PR. - Maybe as the cost of that, I find the code for
PushStream
quite opaque. The code feels strongly coupled to the routing logic, whereas really it's a generic data structure with a few indices. I'm not sure if you hadn't explained the design in person to me that I would have been able to reconstruct it from this code. I don't need it to be pulled out into anmwcc
-style component, but making the shape of the data structure clearer, in docs and in code, would go a long way towards making the routing logic more transparent. - I have many comments below about
ClusterResources
. Not because it's an important change, but because it's a change that (necessarily) touches so many places. It's a good change, rather than adding more and more parameters to all these constructors. However, I'm very concerned that each of the fields isnullptr
in different places, and when I see an instance ofClusterResources
, I have no good way of knowing what should and should not benullptr
. Ideally, none of them could benullptr
, and in the few places we keep them null we bite the bullet and construct something for them to point to. If some of them truly are optional, I'd like that to be documented. If all of them are optional, I think that needs to be reconsidered. - In Doxygen comments (triple slashes), backticks are used, not single ticks. I started noting those down, but the changes are extensive enough that I missed many of them.
I haven't taken a very deep look at the InMemoryStorage
+ FileBackedStorage
changes.
|
||
PushStream(bdlma::ConcurrentPool* pushElementsPool); | ||
|
||
iterator add(const bmqt::MessageGUID& guid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overload of add
has a different return type than the one below. Although both member functions add something, what they actually do is quite different. They should probably have different names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Renaming it to findOrAppendMessage
/// Return the number of remaining Elements in the corresponding GUID. | ||
unsigned int remove(Element* element); | ||
|
||
/// Remove all PushStream Elements corresponding to the specified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return values here are not documented, but you use them in mqbblp_relayqueueengine.cpp(538)
(line number after applying the proposed patch). I assumed this was an error code when first looking at the interface, and didn't connect that it was the same as remove
above. In retrospect, that's not so obscure, but better to document here too.
|
||
struct App { | ||
Elements d_elements; | ||
bsl::shared_ptr<RelayQueueEngine_AppState> d_app; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the coupling that means this data structure can't be its own component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we could try to do is to forward declare RelayQueueEngine_AppState
and move PushStream
, RelayQueueEngine_PushStorageIterator
, and RelayQueueEngine_VirtualPushStorageIterator
into mqbblp_pushstream
.
And rename the latter two to PushStreamIterator
and VirtualPushStreamIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this strategy. It should tame this file quite a bit.
// GUID or for each App. An 'Element' holding PUSH context for a GUID and | ||
// an App is in two lists - for the GUID for the App. Removal can be | ||
// random for the GUID list and always sequential for the App list. | ||
// The storage still supports 'mqbi::StorageIterator' interface but the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence is very concerning. What is the reason for supporting this interface? This isn't a Storage
, and we really should not take an existing interface, change its semantics, and reuse it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding comment
// The 'mqbi::StorageIterator' interface support is important for QueueEngines
// which access data or delivery exclusively using 'mqbi::StorageIterator'
// interface for both data in "real" storage or 'PushStream'.
apps.erase(d_iteratorApp); | ||
} | ||
|
||
inline mqbi::AppMessage* PushStream::Element::appState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this and appView
be const
overloads of one another? If someone wants a const
view of the AppMessage
, they can store it in a pointer-to-const
, or better yet, take a const
view of the Element
first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference between appState
and appView
is more than a const
. The latter will not allocate any state and the former will
}; | ||
|
||
struct PushStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation comment is quite terse, and seems to put explaining why this data structure is shaped the way it is before what properties the data structure has. I could eventually parse that information out, but a sentence at the start of "This data structure stores contexts of PUSHs, allowing for efficient sequential access by GUID and by App, and random-access removal for GUID." All the information here is good, but I could only make sense of it since you've explained the design before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drawing out the shape of the data structure also helped me here. I'm not sure if ASCII art is truly worth it, but it's complex to see what this data structure is just from the code (even though it's not terribly complex at a high level).
|
||
bdlma::ConcurrentPool* d_pushElementsPool_p; | ||
|
||
PushStream(bdlma::ConcurrentPool* pushElementsPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Precondition on pushElementsPool
is v. important.
// class RelayQueueEngine_PushStorageIterator | ||
// ========================================== | ||
|
||
class RelayQueueEngine_PushStorageIterator : public mqbi::StorageIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, should this really be a StorageIterator
? Or rather, it's not a StorageIterator
; it has different semantics. Is it one for convenience or is there a deeper need for it to be one?
unsigned int removeAll(); | ||
|
||
/// Create new Element associated with the specified 'info', | ||
// 'upstreamSubQueueId', and 'iterator'. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Triple slash this line
// MANIPULATORS | ||
|
||
/// If the specified 'isOutOfOrder' is 'true', insert the specified | ||
// 'msgGUID' into the corresponding App redelivery list. Otherwise, insert |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Triple slash this line.
(I think perhaps having multiple commits could have made this review go faster.) |
// CREATORS | ||
VirtualStorageCatalog::VirtualStorageCatalog(mqbi::Storage* storage, | ||
bslma::Allocator* allocator) | ||
: d_storage_p(storage) | ||
, d_virtualStorages(allocator) | ||
, d_avaialbleOrdinals(allocator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: available
const bool isBroadcast = d_queue_p->isDeliverAll(); | ||
|
||
if (isBroadcast) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- After your changes,
isBroadcast
const used only once in this method. Therefore this is possible:
const bool isBroadcast = d_queue_p->isDeliverAll(); | |
if (isBroadcast) { | |
if (d_queue_p->isDeliverAll()) { |
-
More general question, why do we check if it's a broadcast in two different ways:
a.const bool isBroadcast = d_queue_p->isDeliverAll();
b.QueueEngineUtil::isBroadcastMode(d_queue_p)
-> equal toreturn queue->isDeliverAll() && queue->isAtMostOnce();
Don't we need to also checkd_queue_p->isAtMostOnce()
here? -
Why don't we introduce
bool isBroadcast() const
formqbi::Queue *d_queue_p
? Doing so will simplifyQueueEngineUtil
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up until reliable broadcast, isDeliverAll()
and isAtMostOnce()
are synonyms. Reliable broadcast can be isDeliverAll() && !isAtMostOnce()
. Let's return to this, when we introduce this queue mode
const mqbi::AppMessage& appView = d_currentMessage->appMessageView( | ||
ordinal); | ||
Routers::Result result = Routers::e_SUCCESS; | ||
|
||
if (appView.isNew()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of a small refactor by early return here, so we don't create result
until it's needed?
const mqbi::AppMessage& appView = d_currentMessage->appMessageView( | |
ordinal); | |
Routers::Result result = Routers::e_SUCCESS; | |
if (appView.isNew()) { | |
const mqbi::AppMessage& appView = d_currentMessage->appMessageView( | |
ordinal); | |
if (!appView.isNew()) { | |
d_doRepeat = true; | |
return true; // RETURN | |
} | |
const Routers::Result result = app.selectConsumer(... |
d_timeDelta = getMessageQueueTime(d_currentMessage->attributes()); | ||
} | ||
return d_timeDelta.value(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the signature, it looks like an accessor, but instead it caches the field and returns it. Do we really need this method?
Why don't we reset d_timeDelta
on reset()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this method?
We need the cache to avoid extra work.
Yes, we need to reset it in reset
d_appOrdinal = ordinal; | ||
d_isAuthorized = true; | ||
|
||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return true; | |
return true; // RETURN |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on mqbblp::RootQueueEngine
size_t numMessages = | ||
app->deliverMessages(&delay, key, *d_queueState_p->storage(), appId); | ||
size_t numMessages = app->deliverMessages(&delay, | ||
d_realStorageIter_mp.get(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve alignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d_realStorageIter_mp.get()
corresponds to mqbi::StorageIterator* reader
in QueueEngineUtil_AppState::deliverMessages()
. Could you explain the meaning of reader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reader
is the interface to read date out of whatever storage we use.
Adding
/// Use the specified `reader` to read data for delivery.
@@ -140,6 +140,12 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { | |||
// Throttler for when reject messages | |||
// are dumped into temp files. | |||
|
|||
bslma::ManagedPtr<mqbi::StorageIterator> d_storageIter_mp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add more explanation to the difference between d_storageIter_mp
and d_realStorageIter_mp
? This is confusing to any reader who does not already have deep understanding of the component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
// Storage iterator to the logical stream of messages.
// Queue Engine iterates this one sequentially.
and
// Storage iterator to access storage state.
// Queue Engine uses this one to access random message (as in the case of
// redelivery).
// The message is gone because of either GC or purge | ||
// In either case, start at the beginning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The message is gone because of either GC or purge | |
// In either case, start at the beginning | |
// The message is gone because of either GC or purge. | |
// In either case, start at the beginning. |
mqbi::Storage& storage, | ||
const bsl::string& appId); | ||
size_t deliverMessages(bsls::TimeInterval* delay, | ||
mqbi::StorageIterator* reader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain the meaning of reader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Use the specified `reader` to read data for delivery.
else { | ||
start = d_storageIter_mp.get(); | ||
} | ||
|
||
bsls::TimeInterval delay; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be const bsls::TimeInterval delay;
and const size_t numMessages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the delay
is an out parameter for deliverMessages
the numMessages
is const
, indeed
|
||
app->setUpstreamSubQueueId(upstreamSubQueueId); | ||
// Do not copy resumePoint. New RootQueueEngine redelivers everything | ||
// unconfirmed; its ierator is at the beginning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// unconfirmed; its ierator is at the beginning. | |
// unconfirmed; its iterator is at the beginning. |
start = storageIter_mp.get(); | ||
} | ||
else { | ||
start = d_storageIter_mp.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start = d_storageIter_mp.get(); | |
// TODO comment | |
start = d_storageIter_mp.get(); |
Where does d_storageIter_mp
point to at this point of time? Can you put a small comment explaining?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
// 'start' points at either the resume point (if found) or the first
// unconfirmed message of the 'app' (if not found).
and
// 'start' points at the next message in the logical stream (common
// for all apps).
} | ||
} | ||
if (!context.isEmpty()) { | ||
// Report 'queue time' metric for the entire queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this 'queue time' metric reported at another place in the existing code? Did you move the logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was reported in the QueueEngineUtil_AppState
/QueueEngineUtil_AppsDeliveryContext
upon checking if this is Primary (RootQueueEngine
).
Moved it to. the RootQueueEngine
for (Apps::iterator iter = d_apps.begin(); iter != d_apps.end(); ++iter) { | ||
iter->value()->beforeMessageRemoved(msgGUID, true); | ||
|
||
// PRECONDITIONS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to write // PRECONDITIONS
twice
@@ -1820,8 +1878,10 @@ void RootQueueEngine::loadInternals(mqbcmd::QueueEngine* out) const | |||
consumerState.appId() = iter->key1(); | |||
|
|||
if (d_queueState_p->storage()->hasVirtualStorage(iter->key1())) { | |||
consumerState.isAtEndOfStorage().makeValue( | |||
iter->value()->d_storageIter_mp->atEnd()); | |||
bool isAtEndOfStorage = iter->value()->isAtEndOfStorage() && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool isAtEndOfStorage = iter->value()->isAtEndOfStorage() && | |
const bool isAtEndOfStorage = iter->value()->isAtEndOfStorage() && |
8c1652c
to
1606a42
Compare
resources.d_bufferFactory_p = &d_bufferFactory; | ||
resources.d_scheduler_p = d_scheduler_p; | ||
resources.d_pushElementsPool_p = &d_pushElementsPool; | ||
mqbi::ClusterResources resources( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
Recording for posterity: one of the things we thought about and could implement in the future if needed is to make the pools lazy-initialized, so they're only created when first needed. We're not doing this now, but it could be worthwhile.
@@ -669,17 +670,17 @@ inline AppMessage::AppMessage(const bmqp::RdaInfo& rdaInfo) | |||
// NOTHING | |||
} | |||
|
|||
inline void AppMessage::onPush() | |||
inline void AppMessage::setPushState() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, at least, this change makes the code a little clearer.
inline ClusterResources::ClusterResources( | ||
bdlmt::EventScheduler* scheduler, | ||
bdlbb::BlobBufferFactory* bufferFactory, | ||
BlobSpPool* blobSpPool) | ||
: d_scheduler_p(scheduler) | ||
, d_bufferFactory_p(bufferFactory) | ||
, d_blobSpPool_p(blobSpPool) | ||
, d_pushElementsPool() | ||
{ | ||
BSLS_ASSERT_SAFE(d_scheduler_p); | ||
BSLS_ASSERT_SAFE(d_bufferFactory_p); | ||
BSLS_ASSERT_SAFE(d_blobSpPool_p); | ||
} | ||
|
||
inline ClusterResources::ClusterResources( | ||
bdlmt::EventScheduler* scheduler, | ||
bdlbb::BlobBufferFactory* bufferFactory, | ||
BlobSpPool* blobSpPool, | ||
bdlma::ConcurrentPool* pushElementsPool) | ||
: d_scheduler_p(scheduler) | ||
, d_bufferFactory_p(bufferFactory) | ||
, d_blobSpPool_p(blobSpPool) | ||
, d_pushElementsPool(pushElementsPool) | ||
{ | ||
BSLS_ASSERT_SAFE(d_scheduler_p); | ||
BSLS_ASSERT_SAFE(d_bufferFactory_p); | ||
BSLS_ASSERT_SAFE(d_blobSpPool_p); | ||
BSLS_ASSERT_SAFE(d_pushElementsPool); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this solution.
@@ -0,0 +1,697 @@ | |||
// Copyright 2024 Bloomberg Finance L.P. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notes about PUSH STREAM
@@ -152,6 +153,8 @@ Application::Application(bdlmt::EventScheduler* scheduler, | |||
bdlf::PlaceHolders::_2), // allocator | |||
k_BLOB_POOL_GROWTH_STRATEGY, | |||
d_allocators.get("BlobSpPool")) | |||
, d_pushElementsPool(sizeof(mqbblp::PushStream::Element), | |||
d_allocators.get("PushElementsPool")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the default implementation of this concurrent pool uses geometric growth limited by 32. Does it worth it to set a larger const growth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not expect large volume here as it is highly volatile. Once message is pushed, it gets removed
Apps::iterator itApp = d_apps.find(upstreamSubQueueId); | ||
|
||
unsigned int numMessages = 0; | ||
if (itApp != d_apps.end()) { | ||
numMessages = removeApp(itApp); | ||
} | ||
|
||
return numMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apps::iterator itApp = d_apps.find(upstreamSubQueueId); | |
unsigned int numMessages = 0; | |
if (itApp != d_apps.end()) { | |
numMessages = removeApp(itApp); | |
} | |
return numMessages; | |
Apps::iterator itApp = d_apps.find(upstreamSubQueueId); | |
if (itApp == d_apps.end()) { // predict unlikely? | |
return 0; // RETURN | |
} | |
return removeApp(itApp); |
<< "' could not redeliver GUID: '" << *it | ||
<< "' (not in the storage)"; | ||
} | ||
else if (!reader->appMessageView(ordinal()).isPending()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this condition also be UNLIKELY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. I think, this would be a confirmed or gc'ed message. BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY
would not hurt
ea7463d
71c0b35
to
777a3a3
Compare
777a3a3
to
42995d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 186 of commit 42995d7 has completed with FAILURE
{ | ||
if (!d_pushElementsPool_sp) { | ||
d_pushElementsPool_sp.load( | ||
new (*allocator) bdlma::ConcurrentPool(sizeof(Element), allocator), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without bslma::Default::allocator(allocator)
guard this code will crash if we provide allocator == 0
.
We might either add a precondition or use bslma::Default::allocator(allocator)
before allocating memory.
// loaded in 'appData', 'options' or 'attributes' routines. | ||
d_appData_sp.reset(); | ||
d_options_sp.reset(); | ||
d_attributes.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a normal workflow, we do 3 resets just before we set these fields again in loadMessageAndAttributes
. I think making a lazy reset might be good for performance.
For example, reset only d_appData_sp
(which we check anyway in loadMessageAndAttributes
), and reassign all 3 fields where needed.
We might do it in a separate PR with performance measurements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We explicitly call loadMessageAndAttributes
when accessing d_attributes
and d_options_sp
, so we could leave them. On the other hand, an instance of PushStreamIterator
would be in an inconsistent state. Is the benefit worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This inconsistency only means that some fields are not reseted between calls, but we do have the d_appData_sp
pointer which indicates that these fields are not reseted. So we can guarantee safe usage via class API. The non-initialized fields will only be visible if someone looks at the binary representation of this class with debugger.
// doKeepGuid because of the d_iterator | ||
|
||
if (d_iterator->second.numElements() == 0) { | ||
// d_currentElement->eraseFromStream(d_owner_p->d_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code needed?
#include <mqbcfg_messages.h> | ||
#include <mqbs_inmemorystorage.h> | ||
|
||
#include <mwcu_memoutstream.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#include <mqbcfg_messages.h> | |
#include <mqbs_inmemorystorage.h> | |
#include <mwcu_memoutstream.h> | |
// MQB | |
#include <mqbcfg_messages.h> | |
#include <mqbs_inmemorystorage.h> | |
// MWC | |
#include <mwcu_memoutstream.h> |
case 1: test1_basic(); break; | ||
case 2: test2_iterations(); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically we have a descending order, not sure if UTs runner relies on this
case 1: test1_basic(); break; | |
case 2: test2_iterations(); break; | |
case 2: test2_iterations(); break; | |
case 1: test1_basic(); break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. I thought otherwise. For example, mqbblp_remotequeue.t
c2a1396
to
1604b8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 229 of commit 1604b8c has completed with FAILURE
1604b8c
to
31fc7b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 248 of commit 31fc7b9 has completed with FAILURE
244398f
to
31d7712
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 261 of commit 31d7712 has completed with FAILURE
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
31d7712
to
8f4320e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 268 of commit 8f4320e has completed with FAILURE
Replacing VirtualStorage collections of GUIDs, one for each app, with one collection per queue. The new collection allocates consecutive memory (vector) if needed to keep apps states for each GUID. Each VirtualStorage representing an App, gets assigned an ordinal to use as an index to access the state.
This allows us to:
beforeMessageRemoved
per App)evaluateAutoSubscriptions
, initialization, and purging.Replacing the extra
VirtualStorageCatalog
inRelayQueueEngine
with new type of storagePushStream
. It is short-lived, keeping GUIDs only until they get PUSHed.CONFIRM logic is now inverted. CONFIRM is not a delete operation now but a change of the state.