Skip to content

Commit

Permalink
Making Pulsar Proxy more secure (#1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored Jan 31, 2018
1 parent c86aeed commit dde5865
Show file tree
Hide file tree
Showing 23 changed files with 1,879 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
Expand Down Expand Up @@ -66,8 +69,8 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
log.warn("Producer-client with Role - {} failed to get permissions for destination - {}. {}", role,
destination, e.getMessage());
throw e;
}
}
Expand Down Expand Up @@ -96,8 +99,9 @@ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, S
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
PulsarServerException ex = new PulsarServerException(
String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
PulsarServerException ex = new PulsarServerException(String.format(
"Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s",
role, destination));
permissionFuture.completeExceptionally(ex);
return;
}
Expand All @@ -111,13 +115,12 @@ public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, S
permissionFuture.complete(isAuthorized);
});
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
Expand All @@ -130,8 +133,8 @@ public boolean canConsume(DestinationName destination, String role, String subsc
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role,
destination, e);
log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}. {}", role,
destination, e.getMessage());
throw e;
}
}
Expand All @@ -150,8 +153,46 @@ public boolean canLookup(DestinationName destination, String role) throws Except
return canProduce(destination, role) || canConsume(destination, role, null);
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
AuthAction action) {
/**
* Check whether the specified role can perform a lookup for the specified destination.
*
* For that the caller needs to have producer or consumer permission.
*
* @param destination
* @param role
* @return
* @throws Exception
*/
public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role) {
CompletableFuture<Boolean> finalResult = new CompletableFuture<Boolean>();
canProduceAsync(destination, role).whenComplete((produceAuthorized, ex) -> {
if (ex == null) {
if (produceAuthorized) {
finalResult.complete(produceAuthorized);
return;
}
} else if (log.isDebugEnabled()) {
log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
destination.toString(), role, ex.getMessage());
}
canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
if (consumeAuthorized) {
finalResult.complete(consumeAuthorized);
return;
}
} else if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
destination.toString(), role, e.getMessage());
}
finalResult.complete(false);
});
});
return finalResult;
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
} else {
Expand Down Expand Up @@ -218,13 +259,13 @@ public CompletableFuture<Boolean> checkPermission(DestinationName destination, S
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
log.warn("Client with Role - {} failed to get permissions for destination - {}. {}", role, destination, e.getMessage());
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
Expand All @@ -244,8 +285,7 @@ private boolean checkWildcardPermission(String checkedRole, AuthAction checkedAc
}

// Suffix match
if (permittedRole.charAt(0) == '*'
&& checkedRole.endsWith(permittedRole.substring(1))
if (permittedRole.charAt(0) == '*' && checkedRole.endsWith(permittedRole.substring(1))
&& permittedActions.contains(checkedAction)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
} catch (Exception ex) {
Expand Down
Loading

0 comments on commit dde5865

Please sign in to comment.