Skip to content

Commit

Permalink
wait for unconfirmed before buffering confirms (bloomberg#336)
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo authored and alexander-e1off committed Oct 24, 2024
1 parent 0411faa commit 18c17b6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
20 changes: 14 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2633,12 +2633,20 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext,
}

if (isOpen) {
queue->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
generationCount,
upstreamSubQueueId),
queue);
if (generationCount == 0) {
BALL_LOG_INFO << d_cluster_p->description()
<< ": has deconfigured queue ["
<< queueContext->uri() << "], subStream id ["
<< upstreamSubQueueId << "]";
}
else {
queue->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
generationCount,
upstreamSubQueueId),
queue);
}
}
else {
queue->dispatcher()->execute(
Expand Down
14 changes: 12 additions & 2 deletions src/integration-tests/test_puts_retransmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ def test_shutdown_primary_keep_replica(self, multi_node: Cluster):
# If shutting down primary, the replica needs to wait for new primary.
self.active_node.wait_status(wait_leader=True, wait_ready=False)

self.inspect_results(allow_duplicates=False)
# Do allow duplicates for the scenario when a CONFIRM had passed Proxy
# but did not reach the replication. New Primary then redelivers and
# the Proxy cannot detect the duplicate because it had removed the GUID
# upon the first CONFIRM

self.inspect_results(allow_duplicates=True)

def test_shutdown_replica(self, multi_node: Cluster):
self.setup_cluster_fanout(multi_node)
Expand All @@ -521,7 +526,12 @@ def test_shutdown_replica(self, multi_node: Cluster):
# Because the quorum is 3, cluster is still healthy after shutting down
# replica.

self.inspect_results(allow_duplicates=False)
# Do allow duplicates for the scenario when a CONFIRM had passed Proxy
# but did not reach the replication. New Primary then redelivers and
# the Proxy cannot detect the duplicate because it had removed the GUID
# upon the first CONFIRM

self.inspect_results(allow_duplicates=True)

def test_kill_primary_convert_replica(self, multi_node: Cluster):
self.setup_cluster_fanout(multi_node)
Expand Down

0 comments on commit 18c17b6

Please sign in to comment.