Skip to content

Commit

Permalink
[improve][broker] Add fine-grain authorization to ns/topic management…
Browse files Browse the repository at this point in the history
… endpoints (apache#22305)

(cherry picked from commit fd34d4a)

 Conflicts:
	pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Technoboy- authored and srinath-ctds committed Apr 2, 2024
1 parent 0ca7265 commit 07609e6
Showing 6 changed files with 683 additions and 171 deletions.
Original file line number Diff line number Diff line change
@@ -597,6 +597,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case TRIM_TOPIC:
case DELETE_METADATA:
case UPDATE_METADATA:
case ADD_BUNDLE_RANGE:
Original file line number Diff line number Diff line change
@@ -60,8 +60,6 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -711,10 +709,7 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(int numPartitions,
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> getSchemaCompatibilityStrategyAsyncWithoutAuth()).whenComplete((__, ex) -> {
return getSchemaCompatibilityStrategyAsyncWithoutAuth().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get schema compatibility strategy of topic {} {}",
clientAppId(), topicName, ex);
Original file line number Diff line number Diff line change
@@ -2354,102 +2354,110 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
}

protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set property for key {} on namespace {}", clientAppId(), key,
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalSetProperties(Map<String, String> properties, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}).thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
return policies;
}))
.thenAccept(v -> {
log.info("[{}] Successfully set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to set {} properties on namespace {}", clientAppId(), properties.size(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties.get(key));
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties.get(key)))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get property for key {} of namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalGetProperties(AsyncResponse asyncResponse) {
getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> {
asyncResponse.resume(policies.properties);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to get properties of namespace {}", clientAppId(), namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();

AtomicReference<String> oldVal = new AtomicReference<>(null);
updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
}).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
return policies;
})).thenAccept(v -> {
asyncResponse.resume(oldVal.get());
log.info("[{}] Successfully remove property for key {} on namespace {}", clientAppId(), key,
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to remove property for key {} on namespace {}", clientAppId(), key,
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

protected void internalClearProperties(AsyncResponse asyncResponse) {
validatePoliciesReadOnlyAccess();
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
policies.properties.clear();
return policies;
}))
.thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to clear {} properties on namespace {}", clientAppId(), clearedCount.get(),
namespaceName, cause);
asyncResponse.resume(cause);
return null;
});
}

private CompletableFuture<Void> updatePoliciesAsync(NamespaceName ns, Function<Policies, Policies> updateFunction) {
Original file line number Diff line number Diff line change
@@ -501,7 +501,9 @@ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean
protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null) {
tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
CompletableFuture<Void> future = validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC);
future.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
@@ -833,13 +835,13 @@ private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAs

protected void internalUnloadTopic(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Unloading topic {}", clientAppId(), topicName);
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.UNLOAD);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
if (isTransactionCoordinatorAssign(topicName)) {
@@ -1056,13 +1058,12 @@ protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integ

private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}))
})
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isNot307And404Exception(ex)) {
@@ -1075,16 +1076,14 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse,

private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
.thenRun(() -> {
log.info("[{}] Successfully unloaded tc {}", clientAppId(),
topicName.getPartitionIndex());
asyncResponse.resume(Response.noContent().build());
}))
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(
TransactionCoordinatorID.get(topicName.getPartitionIndex())))
.thenRun(() -> {
log.info("[{}] Successfully unloaded tc {}", clientAppId(), topicName.getPartitionIndex());
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isNot307And404Exception(ex)) {
@@ -1295,13 +1294,13 @@ protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalSta
}

protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);
@@ -1406,13 +1405,13 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition,
boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
@@ -1492,14 +1491,15 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
}

protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.GET_STATS);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
@@ -2252,13 +2252,14 @@ private CompletableFuture<Void> internalResetCursorForNonPartitionedTopic(String

protected void internalCreateSubscription(AsyncResponse asyncResponse, String subscriptionName,
MessageIdImpl messageId, boolean authoritative, boolean replicated, Map<String, String> properties) {
CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
ret = CompletableFuture.completedFuture(null);
}
ret.thenAccept(__ -> {
CompletableFuture<Void> ret = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE,
subscriptionName);
ret.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenAccept(__ -> {
final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId;
log.info("[{}][{}] Creating subscription {} at message id {} with properties {}", clientAppId(),
topicName, subscriptionName, targetMessageId, properties);
@@ -2417,14 +2418,13 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse, String subName,
Map<String, String> subscriptionProperties,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.SUBSCRIBE, subName);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalUpdateSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
subscriptionProperties, authoritative);
@@ -2496,14 +2496,13 @@ protected void internalUpdateSubscriptionProperties(AsyncResponse asyncResponse,
protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, String subName,
Optional<Position> position,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
@@ -2535,14 +2534,13 @@ protected void internalAnalyzeSubscriptionBacklog(AsyncResponse asyncResponse, S

protected void internalGetSubscriptionProperties(AsyncResponse asyncResponse, String subName,
boolean authoritative) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.CONSUME, subName);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
if (topicName.isPartitioned()) {
internalGetSubscriptionPropertiesForNonPartitionedTopic(asyncResponse, subName,
authoritative);
@@ -4181,13 +4179,14 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit

protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
log.info("[{}] Trigger compaction on topic {}", clientAppId(), topicName);
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenAccept(__ -> {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.COMPACT);
future.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
}).thenAccept(__ -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalTriggerCompactionNonPartitionedTopic(asyncResponse, authoritative);
@@ -4667,11 +4666,12 @@ protected CompletableFuture<Void> internalTrimTopic(AsyncResponse asyncResponse,
"Trim on a non-persistent topic is not allowed"));
return null;
}
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC);
if (topicName.isPartitioned()) {
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC).thenCompose((x)
return future.thenCompose((x)
-> trimNonPartitionedTopic(asyncResponse, topicName, authoritative));
}
return validateTopicOperationAsync(topicName, TopicOperation.TRIM_TOPIC)
return future
.thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName))
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
@@ -5353,12 +5353,12 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
}

protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompatibilityStrategy(boolean applied) {
CompletableFuture<Void> future = validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ);
if (applied) {
return getSchemaCompatibilityStrategyAsync();
return future.thenCompose(__ -> getSchemaCompatibilityStrategyAsync());
}
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
return future
.thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.admin;

import io.jsonwebtoken.Jwts;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

@Test(groups = "broker-admin")
public class NamespaceAuthZTest extends MockedPulsarStandalone {

private PulsarAdmin superUserAdmin;

private PulsarAdmin tenantManagerAdmin;

private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();

@SneakyThrows
@BeforeClass
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
start();
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
superUserAdmin.tenants().updateTenant("public", tenantInfo);
this.tenantManagerAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
}


@SneakyThrows
@AfterClass
public void after() {
if (superUserAdmin != null) {
superUserAdmin.close();
}
if (tenantManagerAdmin != null) {
tenantManagerAdmin.close();
}
close();
}


@SneakyThrows
@Test
public void testProperties() {
final String random = UUID.randomUUID().toString();
final String namespace = "public/default";
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createNonPartitionedTopic(topic);

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// test superuser
Map<String, String> properties = new HashMap<>();
properties.put("key1", "value1");
superUserAdmin.namespaces().setProperties(namespace, properties);
superUserAdmin.namespaces().setProperty(namespace, "key2", "value2");
superUserAdmin.namespaces().getProperties(namespace);
superUserAdmin.namespaces().getProperty(namespace, "key2");
superUserAdmin.namespaces().removeProperty(namespace, "key2");
superUserAdmin.namespaces().clearProperties(namespace);

// test tenant manager
tenantManagerAdmin.namespaces().setProperties(namespace, properties);
tenantManagerAdmin.namespaces().setProperty(namespace, "key2", "value2");
tenantManagerAdmin.namespaces().getProperties(namespace);
tenantManagerAdmin.namespaces().getProperty(namespace, "key2");
tenantManagerAdmin.namespaces().removeProperty(namespace, "key2");
tenantManagerAdmin.namespaces().clearProperties(namespace);

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperties(namespace, properties));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperty(namespace, "key2", "value2"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperties(namespace));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperty(namespace, "key2"));


Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().removeProperty(namespace, "key2"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().clearProperties(namespace));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperties(namespace, properties));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperty(namespace, "key2", "value2"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperties(namespace));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperty(namespace, "key2"));


Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().removeProperty(namespace, "key2"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().clearProperties(namespace));

superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject);
}
superUserAdmin.topics().delete(topic, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.admin;

import io.jsonwebtoken.Jwts;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.security.MockedPulsarStandalone;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

@Test(groups = "broker-admin")
public class TopicAuthZTest extends MockedPulsarStandalone {

private PulsarAdmin superUserAdmin;

private PulsarAdmin tenantManagerAdmin;

private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString();
private static final String TENANT_ADMIN_TOKEN = Jwts.builder()
.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();

@SneakyThrows
@BeforeClass
public void before() {
configureTokenAuthentication();
configureDefaultAuthorization();
start();
this.superUserAdmin =PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(SUPER_USER_TOKEN))
.build();
final TenantInfo tenantInfo = superUserAdmin.tenants().getTenantInfo("public");
tenantInfo.getAdminRoles().add(TENANT_ADMIN_SUBJECT);
superUserAdmin.tenants().updateTenant("public", tenantInfo);
this.tenantManagerAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN))
.build();
}


@SneakyThrows
@AfterClass
public void after() {
if (superUserAdmin != null) {
superUserAdmin.close();
}
if (tenantManagerAdmin != null) {
tenantManagerAdmin.close();
}
close();
}


@SneakyThrows
@Test
public void testUnloadAndCompactAndTrim() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createPartitionedTopic(topic, 2);

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// test superuser
superUserAdmin.topics().unload(topic);
superUserAdmin.topics().triggerCompaction(topic);
superUserAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);

// test tenant manager
tenantManagerAdmin.topics().unload(topic);
tenantManagerAdmin.topics().triggerCompaction(topic);
tenantManagerAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName());
tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false);

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().unload(topic));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().triggerCompaction(topic));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().trimTopic(TopicName.get(topic).getPartition(0).getLocalName()));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));

// Test only super/admin can do the operation, other auth are not permitted.
for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().unload(topic));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().triggerCompaction(topic));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().trimTopic(topic));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false));

superUserAdmin.topics().revokePermissions(topic, subject);
}
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}

@Test
@SneakyThrows
public void testGetManagedLedgerInfo() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createPartitionedTopic(topic, 2);

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// test superuser
superUserAdmin.topics().getInternalInfo(topic);

// test tenant manager
tenantManagerAdmin.topics().getInternalInfo(topic);

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getInternalInfo(topic));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.produce || action == AuthAction.consume) {
subAdmin.topics().getInternalInfo(topic);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getInternalInfo(topic));
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}

@Test
@SneakyThrows
public void testGetPartitionedStatsAndInternalStats() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createPartitionedTopic(topic, 2);

@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
// test superuser
superUserAdmin.topics().getPartitionedStats(topic, false);
superUserAdmin.topics().getPartitionedInternalStats(topic);

// test tenant manager
tenantManagerAdmin.topics().getPartitionedStats(topic, false);
tenantManagerAdmin.topics().getPartitionedInternalStats(topic);

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedStats(topic, false));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedInternalStats(topic));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.produce || action == AuthAction.consume) {
subAdmin.topics().getPartitionedStats(topic, false);
subAdmin.topics().getPartitionedInternalStats(topic);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedStats(topic, false));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getPartitionedInternalStats(topic));
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}

@Test
@SneakyThrows
public void testCreateSubscriptionAndUpdateSubscriptionPropertiesAndAnalyzeSubscriptionBacklog() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createPartitionedTopic(topic, 2);
AtomicInteger suffix = new AtomicInteger(1);
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
//
superUserAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest);

// test tenant manager
tenantManagerAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest);

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.consume) {
subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest);
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createSubscription(topic, "test-sub" + suffix.incrementAndGet(), MessageId.earliest));
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
// test UpdateSubscriptionProperties
Map<String, String> properties = new HashMap<>();
superUserAdmin.topics().createSubscription(topic, "test-sub", MessageId.earliest);
// test superuser
superUserAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties);
superUserAdmin.topics().getSubscriptionProperties(topic, "test-sub");
superUserAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty());

// test tenant manager
tenantManagerAdmin.topics().updateSubscriptionProperties(topic, "test-sub" , properties);
tenantManagerAdmin.topics().getSubscriptionProperties(topic, "test-sub");
tenantManagerAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty());

// test nobody
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
if (action == AuthAction.consume) {
subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties);
subAdmin.topics().getSubscriptionProperties(topic, "test-sub");
subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty());
} else {
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().updateSubscriptionProperties(topic, "test-sub", properties));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().getSubscriptionProperties(topic, "test-sub"));

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().analyzeSubscriptionBacklog(TopicName.get(topic).getPartition(0).getLocalName(), "test-sub", Optional.empty()));
}
superUserAdmin.topics().revokePermissions(topic, subject);
}
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}

@Test
@SneakyThrows
public void testCreateMissingPartition() {
final String random = UUID.randomUUID().toString();
final String topic = "persistent://public/default/" + random;
final String subject = UUID.randomUUID().toString();
final String token = Jwts.builder()
.claim("sub", subject).signWith(SECRET_KEY).compact();
superUserAdmin.topics().createPartitionedTopic(topic, 2);
AtomicInteger suffix = new AtomicInteger(1);
@Cleanup
final PulsarAdmin subAdmin = PulsarAdmin.builder()
.serviceHttpUrl(getPulsarService().getWebServiceAddress())
.authentication(new AuthenticationToken(token))
.build();
//
superUserAdmin.topics().createMissedPartitions(topic);

// test tenant manager
tenantManagerAdmin.topics().createMissedPartitions(topic);

Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createMissedPartitions(topic));

for (AuthAction action : AuthAction.values()) {
superUserAdmin.topics().grantPermission(topic, subject, Set.of(action));
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.topics().createMissedPartitions(topic));
superUserAdmin.topics().revokePermissions(topic, subject);
}
superUserAdmin.topics().deletePartitionedTopic(topic, true);
}
}

0 comments on commit 07609e6

Please sign in to comment.