-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix[mqbblp::RemoteQueue]: 0 deduplication timeout causes expired PUTs #494
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -366,7 +366,7 @@ void RemoteQueue::pushMessage( | |
|
||
if (result != mqbi::StorageResult::e_SUCCESS) { | ||
if (d_throttledFailedPushMessages.requestPermission()) { | ||
BALL_LOG_WARN << d_state_p->uri() | ||
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() | ||
<< " failed to store broadcast PUSH [" | ||
<< msgGUID << "], result = " << result; | ||
} | ||
|
@@ -483,6 +483,19 @@ RemoteQueue::RemoteQueue(QueueState* state, | |
os << '@' << d_state_p->uri().asString(); | ||
d_state_p->setDescription(os.str()); | ||
|
||
if (deduplicationTimeMs <= 0) { | ||
d_pendingPutsTimeoutNs = 5 * | ||
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE; | ||
BALL_LOG_WARN << "Remote queue [" << d_state_p->description() | ||
<< "]: cannot schedule PUT deduplication timer with a " | ||
<< "non-positive timeout from config [" | ||
<< deduplicationTimeMs << " ms], use a default PUT " | ||
<< "deduplication timeout for scheduler instead [" | ||
<< bmqu::PrintUtil::prettyTimeInterval( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's print the minutes, easier to read? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pretty time interval prints There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is |
||
d_pendingPutsTimeoutNs) | ||
<< "]"; | ||
} | ||
|
||
BALL_LOG_INFO << "Remote queue: " << d_state_p->uri() | ||
<< " [id: " << d_state_p->id() << "]"; | ||
} | ||
|
@@ -676,10 +689,10 @@ void RemoteQueue::onHandleReleased( | |
it != d_pendingConfirms.end();) { | ||
if (it->d_handle == handle.get()) { | ||
if (d_throttledFailedConfirmMessages.requestPermission()) { | ||
BALL_LOG_WARN << "Dropping CONFIRM because downstream [" | ||
<< handle << "] is gone. [queue: '" | ||
<< d_state_p->description() << "', GUID: '" | ||
<< it->d_guid << "']"; | ||
BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because " | ||
<< "downstream [" << handle << "] is gone. " | ||
<< "[queue: '" << d_state_p->description() | ||
<< "', GUID: '" << it->d_guid << "']"; | ||
} | ||
it = d_pendingConfirms.erase(it); | ||
++numProcessed; | ||
|
@@ -865,7 +878,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, | |
|
||
if (d_throttledFailedPutMessages.requestPermission()) { | ||
BALL_LOG_WARN | ||
<< "#CLIENT_IMPROPER_BEHAVIOR " | ||
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR " | ||
<< "Failed PUT message for queue [" << d_state_p->uri() | ||
<< "] from client [" << source->client()->description() | ||
<< "]. Queue not opened in WRITE mode by the client."; | ||
|
@@ -1152,7 +1165,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event) | |
|
||
if (d_throttledFailedAckMessages.requestPermission()) { | ||
BALL_LOG_STREAM(severity) | ||
<< "Received ACK message [" << ackResult | ||
<< "[THROTTLED] Received ACK message [" << ackResult | ||
<< ", queue: " << d_state_p->description() | ||
<< "] for unknown guid: " << ackMessage.messageGUID(); | ||
} | ||
|
@@ -1313,7 +1326,7 @@ void RemoteQueue::expirePendingMessagesDispatched() | |
|
||
if (numExpired) { | ||
if (d_throttledFailedPutMessages.requestPermission()) { | ||
BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired " | ||
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired " | ||
<< bmqu::PrintUtil::prettyNumber(numExpired) | ||
<< " pending PUT messages (" | ||
<< bmqu::PrintUtil::prettyNumber(numMessages - | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have a
k_
constant instead of5
Or, the default dedup timeout