Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription auth mode by prefix #899

Merged
merged 3 commits into from
Jan 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -78,14 +80,52 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
* the fully qualified destination name associated with the destination.
* @param role
* the app id used to receive messages from the destination.
* @param subscription
* the subscription name defined by the client
*/
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.consume);
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
}
} else {
if (isNotBlank(subscription)) {
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
PulsarServerException ex = new PulsarServerException(
String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
permissionFuture.completeExceptionally(ex);
return;
}
break;
default:
break;
}
}
}
checkAuthorization(destination, role, AuthAction.consume).thenAccept(isAuthorized -> {
permissionFuture.complete(isAuthorized);
});
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
}

public boolean canConsume(DestinationName destination, String role) throws Exception {
public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception {
try {
return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
Expand All @@ -107,7 +147,7 @@ public boolean canConsume(DestinationName destination, String role) throws Excep
* @throws Exception
*/
public boolean canLookup(DestinationName destination, String role) throws Exception {
return canProduce(destination, role) || canConsume(destination, role);
return canProduce(destination, role) || canConsume(destination, role, null);
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -1443,6 +1444,49 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @
nsName.toString(), bundleRange);
}

@POST
@Path("/{property}/{cluster}/{namespace}/subscriptionAuthMode")
@ApiOperation(value = " Set a subscription auth mode for all the destinations on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setSubscriptionAuthMode(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, SubscriptionAuthMode subscriptionAuthMode) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

if (subscriptionAuthMode == null) {
subscriptionAuthMode = SubscriptionAuthMode.None;
}

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, property, cluster, namespace);
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.subscription_auth_mode = subscriptionAuthMode;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
log.info("[{}] Successfully updated subscription auth mode: namespace={}/{}/{}, map={}", clientAppId(), property,
cluster, namespace, jsonMapper().writeValueAsString(policies.backlog_quota_map));

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: concurrent modification",
clientAppId(), property, cluster, namespace);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update subscription auth mode for namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public void checkPermissions() {
DestinationName destination = DestinationName.get(subscription.getDestination());
if (cnx.getBrokerService().getAuthorizationManager() != null) {
try {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) {
return;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -349,7 +350,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
DestinationName.get(subscribe.getTopic()),
originalPrincipal != null ? originalPrincipal : authRole);
originalPrincipal != null ? originalPrincipal : authRole,
subscribe.getSubscription());
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -460,6 +462,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
if (ex.getCause() instanceof PulsarServerException) {
log.info(msg);
} else {
log.warn(msg);
}
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
return null;
});
}

Expand Down
Loading