Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/pulsar
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cbf5ac09807a98ba08cdfc09a5a96f6dbe969042
Choose a base ref
..
head repository: apache/pulsar
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: a29d3b9df46849e67bc53be5cb6ac9e3bb36a691
Choose a head ref
1 change: 1 addition & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
@@ -333,6 +333,7 @@ The Apache Software License, Version 2.0
- listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
* J2ObjC Annotations -- j2objc-annotations-1.3.jar
* Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar
* Swagger -- swagger-annotations-1.6.2.jar
* DataSketches
- memory-0.8.3.jar
- sketches-core-0.8.3.jar
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ public CompletableFuture<Void> clearTenantPersistence(String tenant) {
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
return store.deleteRecursive(path);
} else {
return CompletableFuture.completedFuture(null);
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -43,9 +44,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -621,35 +624,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n

private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) {
getNamespaceReplicatedClustersAsync(namespaceName)
.thenAccept(clusters -> {
for (String cluster : clusters) {
if (!cluster.equals(pulsar().getConfiguration().getClusterName())) {
// this call happens in the background without async composition. completion is logged.
pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(cluster)
.thenCompose(clusterDataOp ->
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster,
clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(),
numPartitions,
true, null))
.whenComplete((__, ex) -> {
if (ex != null) {
log.error(
"[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex);
} else {
log.info(
"[{}] Successfully created partitioned topic {} in "
+ "cluster {}",
clientAppId(), topicName, cluster);
}
});
}
.thenAccept(clusters -> {
// this call happens in the background without async composition. completion is logged.
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions);
});
}

protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground(
Set<String> clusters, int numPartitions) {
final String shortTopicName = topicName.getPartitionedTopicName();
Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>();
for (String cluster : clusters) {
if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
continue;
}
ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources();
CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>();
tasksForAllClusters.put(cluster, createRemoteTopicFuture);
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
if (ex1 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck.
log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster"
+ " {}.", clientAppId(), topicName, cluster, ex1);
createRemoteTopicFuture.completeExceptionally(new RestException(ex1));
return;
}
// Get cluster data success.
TopicsImpl topics =
(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics();
topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null)
.whenComplete((ignore, ex2) -> {
if (ex2 == null) {
// Create success.
log.info("[{}] Successfully created partitioned topic {} in cluster {}",
clientAppId(), topicName, cluster);
createRemoteTopicFuture.complete(null);
return;
}
// Create topic on the remote cluster error.
Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2);
// The topic has been created before, check the partitions count is expected.
if (unwrapEx2 instanceof PulsarAdminException.ConflictException) {
topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> {
if (ex3 != null) {
// Unexpected error, such as NPE. Catch all error to avoid the
// "createRemoteTopicFuture" stuck.
log.error("[{}] Failed to check remote-cluster's topic metadata when creating"
+ " partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex3);
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
}
// Call get partitioned metadata of remote cluster success.
if (topicMeta.partitions == numPartitions) {
log.info("[{}] Skip created partitioned topic {} in cluster {}, because that {}",
clientAppId(), topicName, cluster, unwrapEx2.getMessage());
createRemoteTopicFuture.complete(null);
} else {
String errorMsg = String.format("[%s] There is an exists topic %s with different"
+ " partitions %s on the remote cluster %s, you want to create it"
+ " with partitions %s",
clientAppId(), shortTopicName, topicMeta.partitions, cluster,
numPartitions);
log.error(errorMsg);
createRemoteTopicFuture.completeExceptionally(
new RestException(Status.PRECONDITION_FAILED, errorMsg));
}
});
} else {
// An HTTP error was responded from the remote cluster.
log.error("[{}] Failed to create partitioned topic {} in cluster {}.",
clientAppId(), topicName, cluster, ex2);
createRemoteTopicFuture.completeExceptionally(new RestException(unwrapEx2));
}
});
});
}
return tasksForAllClusters;
}

/**
Original file line number Diff line number Diff line change
@@ -3253,12 +3253,13 @@ protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQu
}

protected CompletableFuture<Void> internalSetReplicationClusters(List<String> clusterIds) {
if (CollectionUtils.isEmpty(clusterIds)) {
return CompletableFuture.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"ClusterIds should not be null or empty"));
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
return validatePoliciesReadOnlyAccessAsync()
.thenCompose(__ -> {
if (CollectionUtils.isEmpty(clusterIds)) {
throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty");
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of replication clusters");
@@ -3273,6 +3274,20 @@ protected CompletableFuture<Void> internalSetReplicationClusters(List<String> cl
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), clusterId));
}
return FutureUtil.waitForAll(futures);
}).thenCompose(__ -> {
// Sync to create partitioned topic on the remote cluster if needed.
TopicName topicNameWithoutPartition = TopicName.get(topicName.getPartitionedTopicName());
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(topicNameWithoutPartition).thenCompose(topicMetaOp -> {
// Skip to create topic if the topic is non-partitioned, because the replicator will create
// it automatically.
if (topicMetaOp.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return FutureUtil.waitForAll(
internalCreatePartitionedTopicToReplicatedClustersInBackground(replicationClusters,
topicMetaOp.get().partitions).values());
});
}).thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -56,12 +57,17 @@ public AuthorizationTest() {
@Override
public void setup() throws Exception {
conf.setClusterName("c1");
conf.setSystemTopicEnabled(false);
conf.setAuthenticationEnabled(true);
conf.setForceDeleteNamespaceAllowed(true);
conf.setForceDeleteTenantAllowed(true);
conf.setAuthenticationProviders(
Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider"));
conf.setAuthorizationEnabled(true);
conf.setAuthorizationAllowWildcardsMatching(true);
conf.setSuperUserRoles(Sets.newHashSet("pulsar.super_user", "pass.pass"));
conf.setBrokerClientAuthenticationPlugin(MockAuthentication.class.getName());
conf.setBrokerClientAuthenticationParameters("user:pass.pass");
internalSetup();
}

@@ -70,6 +76,11 @@ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuil
pulsarAdminBuilder.authentication(new MockAuthentication("pass.pass"));
}

@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.authentication(new MockAuthentication("pass.pass"));
}

@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
@@ -233,6 +244,24 @@ public void simple() throws Exception {

admin.namespaces().deleteNamespace("p1/c1/ns1");
admin.tenants().deleteTenant("p1");

admin.clusters().deleteCluster("c1");
}

@Test
public void testDeleteV1Tenant() throws Exception {
admin.clusters().createCluster("c1", ClusterData.builder().build());
admin.tenants().createTenant("p1", new TenantInfoImpl(Sets.newHashSet("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
waitForChange();


String topic = "persistent://p1/c1/ns1/ds2";
admin.topics().createNonPartitionedTopic(topic);

admin.namespaces().deleteNamespace("p1/c1/ns1", true);
admin.tenants().deleteTenant("p1", true);
admin.clusters().deleteCluster("c1");
}

Loading