Skip to content

Commit

Permalink
[fix] [broker] fix topic partitions was expanded even if disabled top…
Browse files Browse the repository at this point in the history
…ic level replication (apache#22769)

(cherry picked from commit 55ad4b2)
(cherry picked from commit 96e2bda)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jun 7, 2024
1 parent 0ddcd49 commit f0c5776
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -463,7 +464,14 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
if (!policies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
final Set<String> replicationClusters = policies.get().replication_clusters;
// Combine namespace level policies and topic level policies.
Set<String> replicationClusters = policies.get().replication_clusters;
TopicPolicies topicPolicies =
pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName);
if (topicPolicies != null) {
replicationClusters = new HashSet<>(topicPolicies.getReplicationClusters());
}
// Do check replicated clusters.
if (replicationClusters.size() == 0) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -479,6 +487,7 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
// The replication clusters just has the current cluster itself.
return CompletableFuture.completedFuture(null);
}
// Do sync operation to other clusters.
List<CompletableFuture<Void>> futures = replicationClusters.stream()
.map(replicationCluster -> admin.clusters().getClusterAsync(replicationCluster)
.thenCompose(clusterData -> pulsarService.getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,17 @@ public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Except
admin2.topics().createPartitionedTopic(topicName, 2);
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
// Check the partitioned topic has been created at the remote cluster.
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
Awaitility.await().untilAsserted(() -> {
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 2);
});

// Expand partitions
admin2.topics().updatePartitionedTopic(topicName, 3);
Awaitility.await().untilAsserted(() -> {
PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName);
assertEquals(topicMetadata2.partitions, 3);
});
// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
waitReplicatorStopped(partition0);
Expand Down Expand Up @@ -717,4 +726,56 @@ public void testDeletePartitionedTopic() throws Exception {
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
}
}

@Test
public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createPartitionedTopic(topicName, 2);

// Verify replicator works.
verifyReplicationWorks(topicName);

// Disable topic level replication.
setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1);
setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2);

// Expand topic.
admin1.topics().updatePartitionedTopic(topicName, 3);
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3);

// Wait for async tasks that were triggered by expanding topic partitions.
Thread.sleep(3 * 1000);


// Verify: the topics on the remote cluster did not been expanded.
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2);

cleanupTopics(() -> {
admin1.topics().deletePartitionedTopic(topicName, false);
admin2.topics().deletePartitionedTopic(topicName, false);
});
}

@Test
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
admin1.topics().createPartitionedTopic(topicName, 2);

// Verify replicator works.
verifyReplicationWorks(topicName);

// Expand topic.
admin1.topics().updatePartitionedTopic(topicName, 3);
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3);

// Verify: the topics on the remote cluster will be expanded.
Awaitility.await().untilAsserted(() -> {
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 3);
});

cleanupTopics(() -> {
admin1.topics().deletePartitionedTopic(topicName, false);
admin2.topics().deletePartitionedTopic(topicName, false);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,16 @@ protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAc
}

protected void waitChangeEventsInit(String namespace) {
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false)
.join().get();
CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService()
.getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false);
if (future == null) {
return;
}
Optional<Topic> optional = future.join();
if (!optional.isPresent()) {
return;
}
PersistentTopic topic = (PersistentTopic) optional.get();
Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> {
TopicStatsImpl topicStats = topic.getStats(true, false, false);
topicStats.getSubscriptions().entrySet().forEach(entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,14 @@ public void testDeleteNonPartitionedTopic() throws Exception {
public void testDeletePartitionedTopic() throws Exception {
super.testDeletePartitionedTopic();
}

@Test(enabled = false)
public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception {
super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication();
}

@Test(enabled = false)
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
super.testExpandTopicPartitionsOnNamespaceLevelReplication();
}
}

0 comments on commit f0c5776

Please sign in to comment.