Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo committed Aug 12, 2024
1 parent 2be7779 commit 8c1652c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 58 deletions.
50 changes: 27 additions & 23 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,23 +635,36 @@ QueueEngineUtil_ReleaseHandleProctor::~QueueEngineUtil_ReleaseHandleProctor()
// ------------------------------------------

QueueEngineUtil_AppsDeliveryContext::QueueEngineUtil_AppsDeliveryContext(
mqbi::Queue* queue,
mqbi::StorageIterator* currentMessage,
bslma::Allocator* allocator)
mqbi::Queue* queue,
bslma::Allocator* allocator)
: d_consumers(allocator)
, d_doRepeat(currentMessage ? currentMessage->hasReceipt() : false)
, d_currentMessage(currentMessage)
, d_isReady(false)
, d_currentMessage(0)
, d_queue_p(queue)
, d_timeDelta()
{
BSLS_ASSERT_SAFE(queue);
}

void QueueEngineUtil_AppsDeliveryContext::reset()
void QueueEngineUtil_AppsDeliveryContext::start()
{
d_isReady = true;
}

bool QueueEngineUtil_AppsDeliveryContext::reset(
mqbi::StorageIterator* currentMessage)
{
d_doRepeat = false;
d_consumers.clear();
d_timeDelta.reset();

if (!d_isReady) {
return false; // RETURN
}

d_currentMessage = currentMessage;
d_isReady = false;

return d_currentMessage ? d_currentMessage->hasReceipt() : false;
}

bool QueueEngineUtil_AppsDeliveryContext::processApp(
Expand All @@ -669,7 +682,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
bdlf::PlaceHolders::_1),
d_currentMessage);

d_doRepeat = true;
d_isReady = true;

// Broadcast does not need stats nor any special per-message treatment.
return false; // RETURN
Expand All @@ -689,7 +702,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
ordinal);

if (!appView.isNew()) {
d_doRepeat = true;
d_isReady = true;
return true; // RETURN
}

Expand Down Expand Up @@ -721,7 +734,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(

// Early return.
// If all Apps return 'e_NO_CAPACITY_ALL', stop the iteration
// (d_doRepeat == false).
// (d_isReady == false).

return false; // RETURN
}
Expand All @@ -735,7 +748,7 @@ bool QueueEngineUtil_AppsDeliveryContext::processApp(
}

// Still making progress (result != Routers::e_NO_CAPACITY_ALL)
d_doRepeat = true;
d_isReady = true;

return (result == Routers::e_SUCCESS);
}
Expand Down Expand Up @@ -794,20 +807,11 @@ void QueueEngineUtil_AppsDeliveryContext::deliverMessage()
}
}

if (d_doRepeat) {
if (d_currentMessage->advance()) {
// There is at least one more message to deliver
d_doRepeat = d_currentMessage->hasReceipt();
}
else {
d_doRepeat = false;
}
if (d_isReady) {
d_currentMessage->advance();
}
}

bool QueueEngineUtil_AppsDeliveryContext::doRepeat() const
{
return d_doRepeat;
d_currentMessage = 0;
}

bool QueueEngineUtil_AppsDeliveryContext::isEmpty() const
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_queueengineutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,20 +592,25 @@ struct QueueEngineUtil_AppsDeliveryContext {

private:
Consumers d_consumers;
bool d_doRepeat;
bool d_isReady;
mqbi::StorageIterator* d_currentMessage;
mqbi::Queue* d_queue_p;
bsl::optional<bsls::Types::Int64> d_timeDelta;
// Avoid reading the attributes if not necessary. Get timeDelta on demand.
// See comment in `QueueEngineUtil_AppsDeliveryContext::processApp`.

public:
QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue,
mqbi::StorageIterator* currentMessage,
bslma::Allocator* allocator);
QueueEngineUtil_AppsDeliveryContext(mqbi::Queue* queue,
bslma::Allocator* allocator);

/// Start delivery cycle(s).
void start();

/// Prepare the context to process next message.
void reset();
/// Return `true` if the delivery can continue iterating dataStream
/// The `false` return value indicates either the end of the dataStream or
/// the the `e_NO_CAPACITY_ALL` case.
bool reset(mqbi::StorageIterator* currentMessage);

/// Return `true` if the specified `app` is not a broadcast app and has an
/// available handle to deliver the current message with the specified
Expand All @@ -632,11 +637,6 @@ struct QueueEngineUtil_AppsDeliveryContext {
/// Deliver message to the previously processed handles.
void deliverMessage();

/// Return `true` if the delivery can continue iterating dataStream
/// The `false` return value indicates either the end of the dataStream or
/// the the `e_NO_CAPACITY_ALL` case.
bool doRepeat() const;

/// Return `true` if there is at least one delivery target selected.
bool isEmpty() const;

Expand Down
14 changes: 5 additions & 9 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,9 @@ void RelayQueueEngine::deliverMessages()
// 1. End of storage; or
// 2. All subStreams return 'e_NO_CAPACITY_ALL'

QueueEngineUtil_AppsDeliveryContext context(d_queueState_p->queue(),
d_storageIter_mp.get(),
d_allocator_p);

while (context.doRepeat()) {
context.reset();
d_appsDeliveryContext.start();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
unsigned int numApps = d_storageIter_mp->numApps();

Expand All @@ -609,15 +605,15 @@ void RelayQueueEngine::deliverMessages()
d_storageIter_mp->removeCurrentElement();
}

if (context.processApp(*app, i)) {
if (d_appsDeliveryContext.processApp(*app, i)) {
// The current element has made it either to delivery or
// putAside or resumerPoint and it can be removed
d_storageIter_mp->removeCurrentElement();
}
// Else, the current element has made it to resumerPoint and
// it cannot be removed
}
context.deliverMessage();
d_appsDeliveryContext.deliverMessage();
}
}

Expand Down Expand Up @@ -904,8 +900,8 @@ RelayQueueEngine::RelayQueueEngine(QueueState* queueState,
, d_apps(allocator)
, d_appIds(allocator)
, d_self(this) // use default allocator
, d_storageIter_mp()
, d_appsDeliveryContext(d_queueState_p->queue(), allocator)
, d_storageIter_mp()
, d_realStorageIter_mp()
, d_allocator_p(allocator)
{
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ class RelayQueueEngine : public mqbi::QueueEngine {
bdlmt::Throttle d_throttledRejectedMessages;
// Throttler for REJECTs.

bslma::ManagedPtr<PushStreamIterator> d_storageIter_mp;
QueueEngineUtil_AppsDeliveryContext d_appsDeliveryContext;
// Reusable apps delivery context

bslma::ManagedPtr<PushStreamIterator> d_storageIter_mp;
// Storage iterator to the PushStream

bslma::ManagedPtr<mqbi::StorageIterator> d_realStorageIter_mp;
Expand Down
19 changes: 7 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1275,20 +1275,15 @@ void RootQueueEngine::afterNewMessage(
d_queueState_p->queue()));

// Deliver new messages to active (alive and capable to deliver) consumers
d_appsDeliveryContext.start();

QueueEngineUtil_AppsDeliveryContext context(d_queueState_p->queue(),
d_storageIter_mp.get(),
d_allocator_p);

while (context.doRepeat()) {
context.reset();

while (d_appsDeliveryContext.reset(d_storageIter_mp.get())) {
// Assume, all Apps need to deliver (some may be at capacity)
for (Apps::iterator iter = d_apps.begin(); iter != d_apps.end();
++iter) {
AppStateSp& app = iter->value();

if (context.processApp(*app, app->ordinal())) {
if (d_appsDeliveryContext.processApp(*app, app->ordinal())) {
// Consider this message as sent out

d_consumptionMonitor.onMessageSent(iter->key2().first);
Expand All @@ -1297,17 +1292,17 @@ void RootQueueEngine::afterNewMessage(
// Report 'queue time' metric for all active appIds
d_queueState_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME,
context.timeDelta(),
d_appsDeliveryContext.timeDelta(),
app->appId());
}
}
if (!context.isEmpty()) {
if (!d_appsDeliveryContext.isEmpty()) {
// Report 'queue time' metric for the entire queue
d_queueState_p->queue()->stats()->onEvent(
mqbstat::QueueStatsDomain::EventType::e_QUEUE_TIME,
context.timeDelta());
d_appsDeliveryContext.timeDelta());
}
context.deliverMessage();
d_appsDeliveryContext.deliverMessage();
}

if (QueueEngineUtil::isBroadcastMode(d_queueState_p->queue())) {
Expand Down
5 changes: 2 additions & 3 deletions src/groups/mqb/mqbs/mqbs_virtualstorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ const mqbi::StorageMessageAttributes& StorageIterator::attributes() const
if (d_attributes.refCount() == 0) {
// No loaded Attributes for the current message yet.

mqbi::StorageResult::Enum rc = d_virtualStorage_p->d_storage_p->get(
&d_attributes,
d_iterator->first);
mqbi::StorageResult::Enum rc = d_storage_p->get(&d_attributes,
d_iterator->first);
BSLS_ASSERT_SAFE(mqbi::StorageResult::e_SUCCESS == rc);
(void)rc;
}
Expand Down

0 comments on commit 8c1652c

Please sign in to comment.