Skip to content

Commit

Permalink
Fix[MQB]: fix 0 deduplication timeout causing instant expired puts on…
Browse files Browse the repository at this point in the history
… REPLICA

Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Oct 31, 2024
1 parent 844ba5c commit 2d2c902
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ void RemoteQueue::pushMessage(

if (result != mqbi::StorageResult::e_SUCCESS) {
if (d_throttledFailedPushMessages.requestPermission()) {
BALL_LOG_WARN << d_state_p->uri()
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri()
<< " failed to store broadcast PUSH ["
<< msgGUID << "], result = " << result;
}
Expand Down Expand Up @@ -483,6 +483,17 @@ RemoteQueue::RemoteQueue(QueueState* state,
os << '@' << d_state_p->uri().asString();
d_state_p->setDescription(os.str());

if (deduplicationTimeMs <= 0) {
d_pendingPutsTimeoutNs = 5 * bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE;
BALL_LOG_WARN << "Remote queue [" << d_state_p->description()
<< "]: cannot schedule PUT deduplication timer with a "
<< "non-positive timeout from config ["
<< deduplicationTimeMs << " ms], use a default PUT "
<< "deduplication timeout for scheduler instead ["
<< bmqu::PrintUtil::prettyTimeInterval(d_pendingPutsTimeoutNs)
<< "]";
}

BALL_LOG_INFO << "Remote queue: " << d_state_p->uri()
<< " [id: " << d_state_p->id() << "]";
}
Expand Down Expand Up @@ -676,10 +687,10 @@ void RemoteQueue::onHandleReleased(
it != d_pendingConfirms.end();) {
if (it->d_handle == handle.get()) {
if (d_throttledFailedConfirmMessages.requestPermission()) {
BALL_LOG_WARN << "Dropping CONFIRM because downstream ["
<< handle << "] is gone. [queue: '"
<< d_state_p->description() << "', GUID: '"
<< it->d_guid << "']";
BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because "
<< "downstream [" << handle << "] is gone. "
<< "[queue: '" << d_state_p->description()
<< "', GUID: '" << it->d_guid << "']";
}
it = d_pendingConfirms.erase(it);
++numProcessed;
Expand Down Expand Up @@ -865,7 +876,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,

if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN
<< "#CLIENT_IMPROPER_BEHAVIOR "
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR "
<< "Failed PUT message for queue [" << d_state_p->uri()
<< "] from client [" << source->client()->description()
<< "]. Queue not opened in WRITE mode by the client.";
Expand Down Expand Up @@ -1152,7 +1163,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event)

if (d_throttledFailedAckMessages.requestPermission()) {
BALL_LOG_STREAM(severity)
<< "Received ACK message [" << ackResult
<< "[THROTTLED] Received ACK message [" << ackResult
<< ", queue: " << d_state_p->description()
<< "] for unknown guid: " << ackMessage.messageGUID();
}
Expand Down Expand Up @@ -1313,7 +1324,7 @@ void RemoteQueue::expirePendingMessagesDispatched()

if (numExpired) {
if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired "
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired "
<< bmqu::PrintUtil::prettyNumber(numExpired)
<< " pending PUT messages ("
<< bmqu::PrintUtil::prettyNumber(numMessages -
Expand Down

0 comments on commit 2d2c902

Please sign in to comment.