Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async authorization check while creation of producer/consumer #98

Merged
merged 1 commit into from
Nov 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.yahoo.pulsar.broker.authorization;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,7 +26,6 @@
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.AuthAction;
import com.yahoo.pulsar.common.policies.data.Policies;

/**
*/
Expand All @@ -50,9 +49,19 @@ public AuthorizationManager(ServiceConfiguration conf, ConfigurationCacheService
* @param role
* the app id used to send messages to the destination.
*/
public boolean canProduce(DestinationName destination, String role) {
public CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.produce);
}

public boolean canProduce(DestinationName destination, String role) {
try {
return canProduceAsync(destination, role).get();
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}

/**
* Check if the specified role has permission to receive messages from the specified fully qualified destination
Expand All @@ -63,9 +72,19 @@ public boolean canProduce(DestinationName destination, String role) {
* @param role
* the app id used to receive messages from the destination.
*/
public boolean canConsume(DestinationName destination, String role) {
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.consume);
}

public boolean canConsume(DestinationName destination, String role) {
try {
return canConsumeAsync(destination, role).get();
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
return false;
}
}

/**
* Check whether the specified role can perform a lookup for the specified destination.
Expand All @@ -80,10 +99,14 @@ public boolean canLookup(DestinationName destination, String role) {
return canProduce(destination, role) || canConsume(destination, role);
}

private boolean checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role))
return true;
return checkPermission(destination, role, action) && checkCluster(destination);
private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
return checkPermission(destination, role, action)
.thenApply(isPermission -> isPermission && checkCluster(destination));
}
}

private boolean checkCluster(DestinationName destination) {
Expand All @@ -98,38 +121,49 @@ private boolean checkCluster(DestinationName destination) {
}
}

public boolean checkPermission(DestinationName destination, String role, AuthAction action) {
public CompletableFuture<Boolean> checkPermission(DestinationName destination, String role,
AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
Optional<Policies> policies = configCache.policiesCache().get(POLICY_ROOT + destination.getNamespace());
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
}
permissionFuture.complete(false);
} else {
Set<AuthAction> namespaceActions = policies.get().auth_policies.namespace_auth.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
permissionFuture.complete(true);
} else {
Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth
.get(destination.toString());
if (roles == null) {
// Destination has no custom policy
permissionFuture.complete(false);
} else {
Set<AuthAction> resourceActions = roles.get(role);
if (resourceActions != null && resourceActions.contains(action)) {
// The role has destination level permission
permissionFuture.complete(true);
} else {
permissionFuture.complete(false);
}
}
}
}
return false;
}

Set<AuthAction> namespaceActions = policies.get().auth_policies.namespace_auth.get(role);
if (namespaceActions != null && namespaceActions.contains(action)) {
// The role has namespace level permission
return true;
}

Map<String, Set<AuthAction>> roles = policies.get().auth_policies.destination_auth.get(destination.toString());
if (roles == null) {
// Destination has no custom policy
return false;
}

Set<AuthAction> resourceActions = roles.get(role);
if (resourceActions != null && resourceActions.contains(action)) {
// The role has destination level permission
return true;
}
return false;
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.complete(false);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
return false;
permissionFuture.complete(false);
}
return permissionFuture;
}

/**
Expand Down
Loading