Skip to content

Commit

Permalink
Revert "mqbc::ClusterUtil: Revert to not populate appId infos in non-…
Browse files Browse the repository at this point in the history
…CSL mode…" (bloomberg#413)

This reverts commit 71a9660.

Signed-off-by: Yuan Jing Vincent Yan <yyan82@bloomberg.net>
  • Loading branch information
kaikulimu authored and alexander-e1off committed Oct 24, 2024
1 parent c5f0d1c commit 0017ba6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 46 deletions.
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,7 @@ void ClusterUtil::populateQueueAssignmentAdvisory(
ClusterState* clusterState,
ClusterData* clusterData,
const bmqt::Uri& uri,
const mqbi::Domain* domain,
bool isCSLMode)
const mqbi::Domain* domain)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(advisory);
Expand All @@ -809,10 +808,8 @@ void ClusterUtil::populateQueueAssignmentAdvisory(
uri.asString());
key->loadBinary(&queueInfo.key());

if (isCSLMode) {
// Generate appIds and appKeys
populateAppIdInfos(&queueInfo.appIds(), domain->config().mode());
}
// Generate appIds and appKeys
populateAppIdInfos(&queueInfo.appIds(), domain->config().mode());

BALL_LOG_INFO << clusterData->identity().description()
<< ": Populated QueueAssignmentAdvisory: " << *advisory;
Expand Down Expand Up @@ -1008,8 +1005,7 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
clusterState,
clusterData,
uri,
domIt->second->domain(),
cluster->isCSLModeEnabled());
domIt->second->domain());
if (cluster->isCSLModeEnabled()) {
// In CSL mode, we delay the insertion to queueKeys until
// 'onQueueAssigned' observer callback.
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbc/mqbc_clusterutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,14 @@ struct ClusterUtil {
/// Populate the specified `advisory` with information describing a
/// queue assignment of the specified `uri` living in the specified
/// `domain`, using the specified `clusterState`, `clusterData`. Load into
/// the specified `key` the unique queue key generated. AppIds info will
/// not be populated if the specified `isCSLMode` is true.
/// the specified `key` the unique queue key generated.
static void populateQueueAssignmentAdvisory(
bmqp_ctrlmsg::QueueAssignmentAdvisory* advisory,
mqbu::StorageKey* key,
ClusterState* clusterState,
ClusterData* clusterData,
const bmqt::Uri& uri,
const mqbi::Domain* domain,
bool isCSLMode);
const mqbi::Domain* domain);

/// Populate the specified `advisory` with information describing a
/// queue unassignment of the specified `uri` having the specified `key`
Expand Down
64 changes: 30 additions & 34 deletions src/integration-tests/test_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,37 +143,33 @@ def test_restart_from_non_FSM_to_FSM(cluster: Cluster):

cluster.stop_nodes()

# TODO: Disable reconfiguring the cluster for now, as we are resolving a
# compatibility issue as described at
# https://github.com/bloomberg/blazingmq/pull/408

# # Reconfigure the cluster from non-FSM to FSM mode
# for broker in cluster.configurator.brokers.values():
# my_clusters = broker.clusters.my_clusters
# if len(my_clusters) > 0:
# my_clusters[0].cluster_attributes.is_cslmode_enabled = True
# my_clusters[0].cluster_attributes.is_fsmworkflow = True
# cluster.deploy_domains()

# cluster.start_nodes(wait_leader=True, wait_ready=True)
# # For a standard cluster, states have already been restored as part of
# # leader re-election.
# if cluster.is_single_node:
# producer.wait_state_restored()

# producer.post(tc.URI_PRIORITY, payload=["msg2"], wait_ack=True, succeed=True)
# producer.post(tc.URI_FANOUT, payload=["fanout_msg2"], wait_ack=True, succeed=True)

# # Consumer for priority queue
# consumer = next(proxies).create_client("consumer")
# consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True)
# consumer.wait_push_event()
# assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2)

# # Consumer for fanout queue
# consumer_fanout = next(proxies).create_client("consumer_fanout")
# consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
# consumer_fanout.wait_push_event()
# assert wait_until(
# lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2
# )
# Reconfigure the cluster from non-FSM to FSM mode
for broker in cluster.configurator.brokers.values():
my_clusters = broker.clusters.my_clusters
if len(my_clusters) > 0:
my_clusters[0].cluster_attributes.is_cslmode_enabled = True
my_clusters[0].cluster_attributes.is_fsmworkflow = True
cluster.deploy_domains()

cluster.start_nodes(wait_leader=True, wait_ready=True)
# For a standard cluster, states have already been restored as part of
# leader re-election.
if cluster.is_single_node:
producer.wait_state_restored()

producer.post(tc.URI_PRIORITY, payload=["msg2"], wait_ack=True, succeed=True)
producer.post(tc.URI_FANOUT, payload=["fanout_msg2"], wait_ack=True, succeed=True)

# Consumer for priority queue
consumer = next(proxies).create_client("consumer")
consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True)
consumer.wait_push_event()
assert wait_until(lambda: len(consumer.list(tc.URI_PRIORITY, block=True)) == 2, 2)

# Consumer for fanout queue
consumer_fanout = next(proxies).create_client("consumer_fanout")
consumer_fanout.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer_fanout.wait_push_event()
assert wait_until(
lambda: len(consumer_fanout.list(tc.URI_FANOUT_FOO, block=True)) == 2, 2
)

0 comments on commit 0017ba6

Please sign in to comment.