Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix[MQB]: remove maxDeliveryAttempts range check #496

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,34 +194,6 @@ int normalizeConfig(mqbconfm::Domain* defn,
{
int updatedValues = 0;

const unsigned int maxDeliveryAttempts = defn->maxDeliveryAttempts();
const mqbcfg::MessageThrottleConfig& messageThrottleConfig =
domain.cluster()->isClusterMember()
? domain.cluster()->clusterConfig()->messageThrottleConfig()
: domain.cluster()->clusterProxyConfig()->messageThrottleConfig();

const unsigned int highThresh = messageThrottleConfig.highThreshold();
const unsigned int minValue = highThresh + 1;

// 'maxDeliveryAttempts' can be zero, which indicates unlimited attempts.
if (maxDeliveryAttempts && maxDeliveryAttempts < minValue) {
errorDescription << domain.cluster()->name() << ", " << domain.name()
<< ": maxDeliveryAttempts is less than message "
"throttling high threshold value. "
"maxDeliveryAttempts: "
<< maxDeliveryAttempts
<< ", highThreshold: " << highThresh
<< ". Updated "
"maxDeliveryAttempts to have a value of "
<< minValue
<< ". Please update the value of "
"maxDeliveryAttempts in this domain's config to "
"have a value of at least "
<< minValue << ".\n";
defn->maxDeliveryAttempts() = minValue;
++updatedValues;
}

if (defn->mode().isBroadcastValue() &&
defn->consistency().selectionId() ==
mqbconfm::Consistency::SELECTION_ID_STRONG) {
Expand Down Expand Up @@ -515,7 +487,7 @@ int Domain::configure(bsl::ostream& errorDescription,
// Invoke callbacks for each added and removed ID on each queue.
bsl::unordered_set<bsl::string>::const_iterator it =
addedIds.cbegin();
QueueMap::const_iterator qIt;
QueueMap::const_iterator qIt;
for (; it != addedIds.cend(); it++) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
Expand Down
6 changes: 3 additions & 3 deletions src/integration-tests/test_poison_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ def _crash_consumer_restart_leader(
# 2. send a message to the consumer
# 3. open a consumer
# 4. kill a consumer
# 5. force a leader change
# 6. open a consumer
# 5. open a consumer
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
# 6. force a leader change
# 7. kill a consumer a again
# 8. open a consumer
# The message should still exist and be delivered to the consumer (the
Expand Down Expand Up @@ -443,7 +443,7 @@ def test_poison_proxy_and_replica_fanout(self, multi_node: Cluster):
multi_node, proxy, tc.DOMAIN_FANOUT, ["?id=foo", "?id=bar", "?id=baz"]
)

@max_delivery_attempts(2)
@max_delivery_attempts(3)
pniedzielski marked this conversation as resolved.
Show resolved Hide resolved
def test_poison_rda_reset_priority_active(self, multi_node: Cluster):
proxies = multi_node.proxy_cycle()
# pick proxy in datacenter opposite to the primary's
Expand Down
49 changes: 49 additions & 0 deletions src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,55 @@ def do_test(expect_success):
# Expect that message will expire after failed deliveries.
do_test(False)

@tweak.domain.max_delivery_attempts(1)
def test_reconfigure_max_delivery_attempts_finite(self, multi_node: Cluster):
URI = f"bmq://{tc.DOMAIN_PRIORITY}/reconf-rda"
proxy = next(multi_node.proxy_cycle())

# Open the queue through the writer.
self.writer.open(URI, flags=["write,ack"], succeed=True)

def do_test(expect_success, delivery_attempts):
# Write one message to 'URI'.
self.post_n_msgs(URI, 1)

# Open, read, and kill delivery_attempts consumers in sequence.
for idx in range(0, delivery_attempts - 1):
client = proxy.create_client(f"reader-unstable-{idx}")
client.open(URI, flags=["read"], succeed=True)
client.check_exit_code = False
client.wait_push_event(timeout=5)
client.kill()
client.wait()

# Open one more client, and ensure it succeeds or fails to read a
# message according to 'expect_success'.
client = proxy.create_client("reader-stable")
client.open(URI, flags=["read"], succeed=True)
if expect_success:
client.confirm(URI, "+1", succeed=True)
else:
assert not client.wait_push_event(timeout=5)
client.stop_session(block=True)

# Expect that message will not expire after failed deliveries.
do_test(True, 1)

for max_delivery_attempts in range(2, 7):
# Reconfigure messages to expire after max_delivery_attempts delivery attempts.
multi_node.config.domains[
tc.DOMAIN_PRIORITY
].definition.parameters.max_delivery_attempts = max_delivery_attempts
multi_node.reconfigure_domain(tc.DOMAIN_PRIORITY, succeed=True)

# Attempt to deliver message max_delivery_attempts times,
# client confirms at the last attempt.
do_test(True, max_delivery_attempts)

# Attempt to deliver message max_delivery_attempts + 1 times,
# client never confirms, and should timeout after broker maxing out the attempts.
do_test(False, max_delivery_attempts + 1)

@tweak.domain.max_delivery_attempts(0)
def test_reconfigure_max_delivery_attempts_on_existing_messages(
self, multi_node: Cluster
Expand Down
Loading