From 04c605f261eecf24397eb97aaf1e3ccd6f48235e Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 7 Mar 2020 18:10:03 -0800 Subject: [PATCH] Instead of always using admin access for topic, use read/write/admin access for topic (#6504) Co-authored-by: Sanjeev Kulkarni (cherry picked from commit 36ea153c0ff4fc3e3f04de4a37b658daa9f116fa) --- .../admin/impl/PersistentTopicsBase.java | 64 +++++++++++++++---- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d642d22819a8e..6d22238a62182 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -234,6 +234,44 @@ public void validateAdminOperationOnTopic(boolean authoritative) { validateTopicOwnership(topicName, authoritative); } + public void validateReadOperationOnTopic(boolean authoritative) { + validateTopicOwnership(topicName, authoritative); + try { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); + } + validateAdminAccessForSubscriber(""); + } + } + + public void validateWriteOperationOnTopic(boolean authoritative) { + validateTopicOwnership(topicName, authoritative); + try { + validateAdminAccessForTenant(topicName.getTenant()); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId()); + } + try { + if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(), + clientAuthData())) { + log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId()); + throw new RestException(Status.UNAUTHORIZED, + String.format("Subscriber %s is not authorized to access this operation", clientAppId())); + } + } catch (RestException re) { + throw re; + } catch (Exception ex) { + // unknown error marked as internal server error + log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName, + clientAppId(), e.getMessage(), ex); + throw new RestException(ex); + } + } + } + protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) { validateTopicOwnership(topicName, authoritative); try { @@ -317,7 +355,7 @@ protected void internalGrantPermissionsOnTopic(String role, Set acti } protected void internalDeleteTopicForcefully(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); try { topic.deleteForcefully().get(); @@ -391,7 +429,7 @@ protected void internalRevokePermissionsOnTopic(String role) { } protected void internalCreateNonPartitionedTopic(boolean authoritative) { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(authoritative); validateNonPartitionTopicName(topicName.getLocalName()); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); @@ -426,7 +464,7 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) { * @param numPartitions */ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(false); // Only do the validation if it's the first hop. if (!updateLocalTopicOnly) { validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions); @@ -540,7 +578,7 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) { try { - validateAdminAccessForTenant(topicName.getTenant()); + validateWriteOperationOnTopic(authoritative); } catch (Exception e) { log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -738,7 +776,7 @@ protected void internalDeleteTopic(boolean authoritative, boolean force) { } protected void internalDeleteTopic(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); // v2 topics have a global name so check if the topic is replicated. @@ -825,7 +863,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { try { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); final List subscriptions = Lists.newArrayList(); topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); @@ -1279,7 +1317,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy // validate ownership and redirect if current broker is not owner PersistentTopic topic; try { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); topic = (PersistentTopic) getTopicReference(topicName); } catch (Exception e) { @@ -1744,7 +1782,7 @@ protected MessageId internalTerminate(boolean authoritative) { if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed"); } - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); Topic topic = getTopicReference(topicName); try { return ((PersistentTopic) topic).terminate().get(); @@ -1867,7 +1905,7 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire } protected void internalTriggerCompaction(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); try { @@ -1880,13 +1918,13 @@ protected void internalTriggerCompaction(boolean authoritative) { } protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.compactionStatus(); } protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) { - validateAdminOperationOnTopic(authoritative); + validateWriteOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); try { topic.triggerOffload(messageId); @@ -1899,7 +1937,7 @@ protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messa } protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.offloadStatus(); } @@ -2237,7 +2275,7 @@ private void validateNonPartitionTopicName(String topicName) { } protected MessageId internalGetLastMessageId(boolean authoritative) { - validateAdminOperationOnTopic(authoritative); + validateReadOperationOnTopic(authoritative); if (!(getTopicReference(topicName) instanceof PersistentTopic)) { log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);