diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 10bcf80c7..750da298c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -194,34 +194,6 @@ int normalizeConfig(mqbconfm::Domain* defn, { int updatedValues = 0; - const unsigned int maxDeliveryAttempts = defn->maxDeliveryAttempts(); - const mqbcfg::MessageThrottleConfig& messageThrottleConfig = - domain.cluster()->isClusterMember() - ? domain.cluster()->clusterConfig()->messageThrottleConfig() - : domain.cluster()->clusterProxyConfig()->messageThrottleConfig(); - - const unsigned int highThresh = messageThrottleConfig.highThreshold(); - const unsigned int minValue = highThresh + 1; - - // 'maxDeliveryAttempts' can be zero, which indicates unlimited attempts. - if (maxDeliveryAttempts && maxDeliveryAttempts < minValue) { - errorDescription << domain.cluster()->name() << ", " << domain.name() - << ": maxDeliveryAttempts is less than message " - "throttling high threshold value. " - "maxDeliveryAttempts: " - << maxDeliveryAttempts - << ", highThreshold: " << highThresh - << ". Updated " - "maxDeliveryAttempts to have a value of " - << minValue - << ". Please update the value of " - "maxDeliveryAttempts in this domain's config to " - "have a value of at least " - << minValue << ".\n"; - defn->maxDeliveryAttempts() = minValue; - ++updatedValues; - } - if (defn->mode().isBroadcastValue() && defn->consistency().selectionId() == mqbconfm::Consistency::SELECTION_ID_STRONG) { @@ -515,7 +487,7 @@ int Domain::configure(bsl::ostream& errorDescription, // Invoke callbacks for each added and removed ID on each queue. bsl::unordered_set::const_iterator it = addedIds.cbegin(); - QueueMap::const_iterator qIt; + QueueMap::const_iterator qIt; for (; it != addedIds.cend(); it++) { for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) { d_dispatcher_p->execute( diff --git a/src/integration-tests/test_poison_messages.py b/src/integration-tests/test_poison_messages.py index 60eff2374..5b575a991 100644 --- a/src/integration-tests/test_poison_messages.py +++ b/src/integration-tests/test_poison_messages.py @@ -158,8 +158,8 @@ def _crash_consumer_restart_leader( # 2. send a message to the consumer # 3. open a consumer # 4. kill a consumer - # 5. force a leader change - # 6. open a consumer + # 5. open a consumer + # 6. force a leader change # 7. kill a consumer a again # 8. open a consumer # The message should still exist and be delivered to the consumer (the @@ -443,7 +443,7 @@ def test_poison_proxy_and_replica_fanout(self, multi_node: Cluster): multi_node, proxy, tc.DOMAIN_FANOUT, ["?id=foo", "?id=bar", "?id=baz"] ) - @max_delivery_attempts(2) + @max_delivery_attempts(3) def test_poison_rda_reset_priority_active(self, multi_node: Cluster): proxies = multi_node.proxy_cycle() # pick proxy in datacenter opposite to the primary's diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 3f325cba8..001f5ed70 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -355,6 +355,55 @@ def do_test(expect_success): # Expect that message will expire after failed deliveries. do_test(False) + @tweak.domain.max_delivery_attempts(1) + def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster): + URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda" + proxy = next(multi_node.proxy_cycle()) + + # Open the queue through the writer. + self.writer.open(URI, flags=["write,ack"], succeed=True) + + def do_test(expect_success, delivery_attempts): + # Write one message to 'URI'. + self.post_n_msgs(URI, 1) + + # Open, read, and kill delivery_attempts consumers in sequence. + for idx in range(0, delivery_attempts - 1): + client = proxy.create_client(f"reader-unstable-{idx}") + client.open(URI, flags=["read"], succeed=True) + client.check_exit_code = False + client.wait_push_event(timeout=5) + client.kill() + client.wait() + + # Open one more client, and ensure it succeeds or fails to read a + # message according to 'expect_success'. + client = proxy.create_client("reader-stable") + client.open(URI, flags=["read"], succeed=True) + if expect_success: + client.confirm(URI, "+1", succeed=True) + else: + assert not client.wait_push_event(timeout=5) + client.stop_session(block=True) + + # Expect that message will not expire after failed deliveries. + do_test(True, 1) + + for max_delivery_attempts in range(2, 7): + # Reconfigure messages to expire after max_delivery_attempts delivery attempts. + multi_node.config.domains[ + tc.DOMAIN_PRIORITY + ].definition.parameters.max_delivery_attempts = max_delivery_attempts + multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True) + + # Attempt to deliver message max_delivery_attempts times, + # client confirms at the last attempt. + do_test(True, max_delivery_attempts) + + # Attempt to deliver message max_delivery_attempts + 1 times, + # client never confirms, and should timeout after broker maxing out the attempts. + do_test(False, max_delivery_attempts + 1) + @tweak.domain.max_delivery_attempts(0) def test_reconfigure_max_delivery_attempts_on_existing_messages( self, multi_node: Cluster