From 88ebe785dbdab239104981453a9bd0e4a7e896d3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 10:04:48 +0800 Subject: [PATCH] [fix][broker] Fix stuck when enable topic level replication and build remote admin fails (#23028) --- .../pulsar/broker/admin/AdminResource.java | 15 ++++++-- .../broker/service/OneWayReplicatorTest.java | 35 +++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 45455f16d4dc1..1f43aeaa668bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -630,7 +631,7 @@ private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int }); } - protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( + protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground ( Set clusters, int numPartitions) { final String shortTopicName = topicName.getPartitionedTopicName(); Map> tasksForAllClusters = new HashMap<>(); @@ -649,9 +650,17 @@ protected Map> internalCreatePartitionedTopicToR createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); return; } + PulsarAdmin remotePulsarAdmin; + try { + remotePulsarAdmin = pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData); + } catch (Exception ex) { + log.error("[{}] [{}] An un-expected error occurs when trying to create remote pulsar admin for" + + " cluster {}", clientAppId(), topicName, cluster, ex); + createRemoteTopicFuture.completeExceptionally(new RestException(ex)); + return; + } // Get cluster data success. - TopicsImpl topics = - (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + TopicsImpl topics = (TopicsImpl) remotePulsarAdmin.topics(); topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) .whenComplete((ignore, ex2) -> { if (ex2 == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 80091c9e5eb2c..9aad26530df5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -38,8 +38,10 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -58,6 +60,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -75,7 +78,9 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -947,6 +952,36 @@ protected void disableReplication(String topic) throws Exception { admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2)); } + @Test(timeOut = 30 * 1000) + public void testCreateRemoteAdminFailed() throws Exception { + final TenantInfo tenantInfo = admin1.tenants().getTenantInfo(defaultTenant); + final String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", ""); + final String randomClusterName = "c_" + UUID.randomUUID().toString().replace("-", ""); + final String topic = BrokerTestUtil.newUniqueName(ns1 + "/tp"); + admin1.namespaces().createNamespace(ns1); + admin1.topics().createPartitionedTopic(topic, 2); + + // Inject a wrong cluster data which with empty fields. + ClusterResources clusterResources = broker1.getPulsar().getPulsarResources().getClusterResources(); + clusterResources.createCluster(randomClusterName, ClusterData.builder().build()); + Set allowedClusters = new HashSet<>(tenantInfo.getAllowedClusters()); + allowedClusters.add(randomClusterName); + admin1.tenants().updateTenant(defaultTenant, TenantInfo.builder().adminRoles(tenantInfo.getAdminRoles()) + .allowedClusters(allowedClusters).build()); + + // Verify. + try { + admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, randomClusterName)); + fail("Expected a error due to empty fields"); + } catch (Exception ex) { + // Expected an error. + } + + // cleanup. + admin1.topics().deletePartitionedTopic(topic); + admin1.tenants().updateTenant(defaultTenant, tenantInfo); + } + @Test public void testConfigReplicationStartAt() throws Exception { // Initialize.