Skip to content

Commit

Permalink
Fix[MQB]: Remove maxDeliveryAttempts range check and add tests
Browse files Browse the repository at this point in the history
- The change made in _crash_consumer_restart_leader comment is to match
the comment with the code.
- Before removing the if block in `mqbblp_domain.cpp`, `maxDeliveryAttempts`
is reset to 5 if it's between 1-4, which is wrong since we want users to be
able to set it to any value. The test `test_poison_rda_reset_priority_active`
originally set `maxDeliveryAttempts` to 2 but attempted 3 deliveries.
Before removing the if block this would work since `maxDeliveryAttempts` is
reset to 5, but after the fix, the initial value for `maxDeliveryAttempts`
should also change to 3.

Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
  • Loading branch information
emelialei88 authored and pniedzielski committed Nov 5, 2024
1 parent b909a17 commit 6ee6823
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 32 deletions.
30 changes: 1 addition & 29 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,34 +194,6 @@ int normalizeConfig(mqbconfm::Domain* defn,
{
int updatedValues = 0;

const unsigned int maxDeliveryAttempts = defn->maxDeliveryAttempts();
const mqbcfg::MessageThrottleConfig& messageThrottleConfig =
domain.cluster()->isClusterMember()
? domain.cluster()->clusterConfig()->messageThrottleConfig()
: domain.cluster()->clusterProxyConfig()->messageThrottleConfig();

const unsigned int highThresh = messageThrottleConfig.highThreshold();
const unsigned int minValue = highThresh + 1;

// 'maxDeliveryAttempts' can be zero, which indicates unlimited attempts.
if (maxDeliveryAttempts && maxDeliveryAttempts < minValue) {
errorDescription << domain.cluster()->name() << ", " << domain.name()
<< ": maxDeliveryAttempts is less than message "
"throttling high threshold value. "
"maxDeliveryAttempts: "
<< maxDeliveryAttempts
<< ", highThreshold: " << highThresh
<< ". Updated "
"maxDeliveryAttempts to have a value of "
<< minValue
<< ". Please update the value of "
"maxDeliveryAttempts in this domain's config to "
"have a value of at least "
<< minValue << ".\n";
defn->maxDeliveryAttempts() = minValue;
++updatedValues;
}

if (defn->mode().isBroadcastValue() &&
defn->consistency().selectionId() ==
mqbconfm::Consistency::SELECTION_ID_STRONG) {
Expand Down Expand Up @@ -515,7 +487,7 @@ int Domain::configure(bsl::ostream& errorDescription,
// Invoke callbacks for each added and removed ID on each queue.
bsl::unordered_set<bsl::string>::const_iterator it =
addedIds.cbegin();
QueueMap::const_iterator qIt;
QueueMap::const_iterator qIt;
for (; it != addedIds.cend(); it++) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
Expand Down
6 changes: 3 additions & 3 deletions src/integration-tests/test_poison_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ def _crash_consumer_restart_leader(
# 2. send a message to the consumer
# 3. open a consumer
# 4. kill a consumer
# 5. force a leader change
# 6. open a consumer
# 5. open a consumer
# 6. force a leader change
# 7. kill a consumer a again
# 8. open a consumer
# The message should still exist and be delivered to the consumer (the
Expand Down Expand Up @@ -443,7 +443,7 @@ def test_poison_proxy_and_replica_fanout(self, multi_node: Cluster):
multi_node, proxy, tc.DOMAIN_FANOUT, ["?id=foo", "?id=bar", "?id=baz"]
)

@max_delivery_attempts(2)
@max_delivery_attempts(3)
def test_poison_rda_reset_priority_active(self, multi_node: Cluster):
proxies = multi_node.proxy_cycle()
# pick proxy in datacenter opposite to the primary's
Expand Down
49 changes: 49 additions & 0 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,55 @@ def do_test(expect_success):
# Expect that message will expire after failed deliveries.
do_test(False)

@tweak.domain.max_delivery_attempts(1)
def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
self.writer.open(URI, flags=["write,ack"], succeed=True)

def do_test(expect_success, delivery_attempts):
# Write one message to 'URI'.
self.post_n_msgs(URI, 1)

# Open, read, and kill delivery_attempts consumers in sequence.
for idx in range(0, delivery_attempts - 1):
client = proxy.create_client(f"reader-unstable-{idx}")
client.open(URI, flags=["read"], succeed=True)
client.check_exit_code = False
client.wait_push_event(timeout=5)
client.kill()
client.wait()

# Open one more client, and ensure it succeeds or fails to read a
# message according to 'expect_success'.
client = proxy.create_client("reader-stable")
client.open(URI, flags=["read"], succeed=True)
if expect_success:
client.confirm(URI, "+1", succeed=True)
else:
assert not client.wait_push_event(timeout=5)
client.stop_session(block=True)

# Expect that message will not expire after failed deliveries.
do_test(True, 1)

for max_delivery_attempts in range(2, 7):
# Reconfigure messages to expire after max_delivery_attempts delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
].definition.parameters.max_delivery_attempts = max_delivery_attempts
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)

# Attempt to deliver message max_delivery_attempts times,
# client confirms at the last attempt.
do_test(True, max_delivery_attempts)

# Attempt to deliver message max_delivery_attempts + 1 times,
# client never confirms, and should timeout after broker maxing out the attempts.
do_test(False, max_delivery_attempts + 1)

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts_on_existing_messages(
self, multi_node: Cluster
Expand Down

0 comments on commit 6ee6823

Please sign in to comment.