Skip to content

Commit

Permalink
Instead of always using admin access for topic, use read/write/admin …
Browse files Browse the repository at this point in the history
…access for topic (apache#6504)

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Mar 8, 2020
1 parent 47ca8e6 commit 36ea153
Showing 1 changed file with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,44 @@ public void validateAdminOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
}

public void validateReadOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
}
validateAdminAccessForSubscriber("");
}
}

public void validateWriteOperationOnTopic(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] failed to validate admin access for {}", topicName, clientAppId());
}
try {
if (!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, clientAppId(),
clientAuthData())) {
log.warn("[{}} Subscriber {} is not authorized to access api", topicName, clientAppId());
throw new RestException(Status.UNAUTHORIZED,
String.format("Subscriber %s is not authorized to access this operation", clientAppId()));
}
} catch (RestException re) {
throw re;
} catch (Exception ex) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}", topicName,
clientAppId(), e.getMessage(), ex);
throw new RestException(ex);
}
}
}

protected void validateAdminAccessForSubscriber(String subscriptionName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
Expand Down Expand Up @@ -317,7 +355,7 @@ protected void internalGrantPermissionsOnTopic(String role, Set<AuthAction> acti
}

protected void internalDeleteTopicForcefully(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
topic.deleteForcefully().get();
Expand Down Expand Up @@ -391,7 +429,7 @@ protected void internalRevokePermissionsOnTopic(String role) {
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(authoritative);
validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down Expand Up @@ -426,7 +464,7 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
* @param numPartitions
*/
protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(false);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
Expand Down Expand Up @@ -540,7 +578,7 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
try {
validateAdminAccessForTenant(topicName.getTenant());
validateWriteOperationOnTopic(authoritative);
} catch (Exception e) {
log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
Expand Down Expand Up @@ -738,7 +776,7 @@ protected void internalDeleteTopic(boolean authoritative, boolean force) {
}

protected void internalDeleteTopic(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);

// v2 topics have a global name so check if the topic is replicated.
Expand Down Expand Up @@ -825,7 +863,7 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
Expand Down Expand Up @@ -1279,7 +1317,7 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
// validate ownership and redirect if current broker is not owner
PersistentTopic topic;
try {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);

topic = (PersistentTopic) getTopicReference(topicName);
} catch (Exception e) {
Expand Down Expand Up @@ -1744,7 +1782,7 @@ protected MessageId internalTerminate(boolean authoritative) {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
}
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
try {
return ((PersistentTopic) topic).terminate().get();
Expand Down Expand Up @@ -1867,7 +1905,7 @@ private void internalExpireMessagesForSinglePartition(String subName, int expire
}

protected void internalTriggerCompaction(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
Expand All @@ -1880,13 +1918,13 @@ protected void internalTriggerCompaction(boolean authoritative) {
}

protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus();
}

protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
validateAdminOperationOnTopic(authoritative);
validateWriteOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
Expand All @@ -1899,7 +1937,7 @@ protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messa
}

protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}
Expand Down Expand Up @@ -2237,7 +2275,7 @@ private void validateNonPartitionTopicName(String topicName) {
}

protected MessageId internalGetLastMessageId(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
validateReadOperationOnTopic(authoritative);

if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}", clientAppId(), topicName);
Expand Down

0 comments on commit 36ea153

Please sign in to comment.