From 37d1450204371084e4cd0b99b01f938b478964e7 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Thu, 31 Oct 2024 17:38:15 +0000 Subject: [PATCH 1/2] Fix[MQB]: fix 0 deduplication timeout causing instant expired puts on REPLICA Signed-off-by: Evgeny Malygin --- src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 29 ++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index bf411902a..31b783185 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -366,7 +366,7 @@ void RemoteQueue::pushMessage( if (result != mqbi::StorageResult::e_SUCCESS) { if (d_throttledFailedPushMessages.requestPermission()) { - BALL_LOG_WARN << d_state_p->uri() + BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << " failed to store broadcast PUSH [" << msgGUID << "], result = " << result; } @@ -483,6 +483,19 @@ RemoteQueue::RemoteQueue(QueueState* state, os << '@' << d_state_p->uri().asString(); d_state_p->setDescription(os.str()); + if (deduplicationTimeMs <= 0) { + d_pendingPutsTimeoutNs = 5 * + bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE; + BALL_LOG_WARN << "Remote queue [" << d_state_p->description() + << "]: cannot schedule PUT deduplication timer with a " + << "non-positive timeout from config [" + << deduplicationTimeMs << " ms], use a default PUT " + << "deduplication timeout for scheduler instead [" + << bmqu::PrintUtil::prettyTimeInterval( + d_pendingPutsTimeoutNs) + << "]"; + } + BALL_LOG_INFO << "Remote queue: " << d_state_p->uri() << " [id: " << d_state_p->id() << "]"; } @@ -676,10 +689,10 @@ void RemoteQueue::onHandleReleased( it != d_pendingConfirms.end();) { if (it->d_handle == handle.get()) { if (d_throttledFailedConfirmMessages.requestPermission()) { - BALL_LOG_WARN << "Dropping CONFIRM because downstream [" - << handle << "] is gone. [queue: '" - << d_state_p->description() << "', GUID: '" - << it->d_guid << "']"; + BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because " + << "downstream [" << handle << "] is gone. " + << "[queue: '" << d_state_p->description() + << "', GUID: '" << it->d_guid << "']"; } it = d_pendingConfirms.erase(it); ++numProcessed; @@ -865,7 +878,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, if (d_throttledFailedPutMessages.requestPermission()) { BALL_LOG_WARN - << "#CLIENT_IMPROPER_BEHAVIOR " + << "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR " << "Failed PUT message for queue [" << d_state_p->uri() << "] from client [" << source->client()->description() << "]. Queue not opened in WRITE mode by the client."; @@ -1152,7 +1165,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event) if (d_throttledFailedAckMessages.requestPermission()) { BALL_LOG_STREAM(severity) - << "Received ACK message [" << ackResult + << "[THROTTLED] Received ACK message [" << ackResult << ", queue: " << d_state_p->description() << "] for unknown guid: " << ackMessage.messageGUID(); } @@ -1313,7 +1326,7 @@ void RemoteQueue::expirePendingMessagesDispatched() if (numExpired) { if (d_throttledFailedPutMessages.requestPermission()) { - BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired " + BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired " << bmqu::PrintUtil::prettyNumber(numExpired) << " pending PUT messages (" << bmqu::PrintUtil::prettyNumber(numMessages - From 5749e0a1802004fa87ca2c22f80e8540e3b91fd0 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Thu, 31 Oct 2024 18:08:19 +0000 Subject: [PATCH 2/2] Put 5 minutes timeout to constants Signed-off-by: Evgeny Malygin --- src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 31b783185..30595ba0c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -64,6 +64,16 @@ namespace BloombergLP { namespace mqbblp { +namespace { + +/// The default timeout for scheduled PUT expiration clean-up event. +static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES = 5; +static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS = + k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES * + bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE; + +} // close unnamed namespace + // ----------------- // class RemoteQueue // ----------------- @@ -484,13 +494,12 @@ RemoteQueue::RemoteQueue(QueueState* state, d_state_p->setDescription(os.str()); if (deduplicationTimeMs <= 0) { - d_pendingPutsTimeoutNs = 5 * - bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE; + d_pendingPutsTimeoutNs = k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS; BALL_LOG_WARN << "Remote queue [" << d_state_p->description() - << "]: cannot schedule PUT deduplication timer with a " + << "]: cannot schedule PUT expiration timer with a " << "non-positive timeout from config [" << deduplicationTimeMs << " ms], use a default PUT " - << "deduplication timeout for scheduler instead [" + << "expiration timeout for scheduler instead [" << bmqu::PrintUtil::prettyTimeInterval( d_pendingPutsTimeoutNs) << "]";