Skip to content

Commit

Permalink
[fix][broker] Fix stuck when enable topic level replication and build…
Browse files Browse the repository at this point in the history
… remote admin fails (apache#23028)
  • Loading branch information
poorbarcode authored Jul 15, 2024
1 parent c160cc9 commit 88ebe78
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
Expand Down Expand Up @@ -630,7 +631,7 @@ private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int
});
}

protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground (
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
Expand All @@ -649,9 +650,17 @@ protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToR
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
return;
}
PulsarAdmin remotePulsarAdmin;
try {
remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData);
} catch (Exception ex) {
log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for"
+ " cluster {}", clientAppId(), topicName, cluster, ex);
createRemoteTopicFuture.completeExceptionally(new RestException(ex));
return;
}
// Get cluster data success.
TopicsImpl topics =
(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics();
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -58,6 +60,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -75,7 +78,9 @@
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -947,6 +952,36 @@ protected void disableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

@Test(timeOut = 30 * 1000)
public void testCreateRemoteAdminFailed() throws Exception {
final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant);
final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", "");
final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp");
admin1.namespaces().createNamespace(ns1);
admin1.topics().createPartitionedTopic(topic, 2);

// Inject a wrong cluster data which with empty fields.
ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources();
clusterResources.createCluster(randomClusterName, ClusterData.builder().build());
Set<String> allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters());
allowedClusters.add(randomClusterName);
admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles())
.allowedClusters(allowedClusters).build());

// Verify.
try {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName));
fail("Expected a error due to empty fields");
} catch (Exception ex) {
// Expected an error.
}

// cleanup.
admin1.topics().deletePartitionedTopic(topic);
admin1.tenants().updateTenant(defaultTenant, tenantInfo);
}

@Test
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
Expand Down

0 comments on commit 88ebe78

Please sign in to comment.