diff --git a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java index 67c155f086b60..96f9c7e04b79f 100644 --- a/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java +++ b/pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/authorization/AuthorizationManager.java @@ -16,8 +16,8 @@ package com.yahoo.pulsar.broker.authorization; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +26,6 @@ import com.yahoo.pulsar.broker.cache.ConfigurationCacheService; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.AuthAction; -import com.yahoo.pulsar.common.policies.data.Policies; /** */ @@ -50,9 +49,19 @@ public AuthorizationManager(ServiceConfiguration conf, ConfigurationCacheService * @param role * the app id used to send messages to the destination. */ - public boolean canProduce(DestinationName destination, String role) { + public CompletableFuture canProduceAsync(DestinationName destination, String role) { return checkAuthorization(destination, role, AuthAction.produce); } + + public boolean canProduce(DestinationName destination, String role) { + try { + return canProduceAsync(destination, role).get(); + } catch (Exception e) { + log.warn("Producer-client with Role - {} failed to get permissions for destination - {}", role, + destination, e); + return false; + } + } /** * Check if the specified role has permission to receive messages from the specified fully qualified destination @@ -63,9 +72,19 @@ public boolean canProduce(DestinationName destination, String role) { * @param role * the app id used to receive messages from the destination. */ - public boolean canConsume(DestinationName destination, String role) { + public CompletableFuture canConsumeAsync(DestinationName destination, String role) { return checkAuthorization(destination, role, AuthAction.consume); } + + public boolean canConsume(DestinationName destination, String role) { + try { + return canConsumeAsync(destination, role).get(); + } catch (Exception e) { + log.warn("Consumer-client with Role - {} failed to get permissions for destination - {}", role, + destination, e); + return false; + } + } /** * Check whether the specified role can perform a lookup for the specified destination. @@ -80,10 +99,14 @@ public boolean canLookup(DestinationName destination, String role) { return canProduce(destination, role) || canConsume(destination, role); } - private boolean checkAuthorization(DestinationName destination, String role, AuthAction action) { - if (isSuperUser(role)) - return true; - return checkPermission(destination, role, action) && checkCluster(destination); + private CompletableFuture checkAuthorization(DestinationName destination, String role, + AuthAction action) { + if (isSuperUser(role)) { + return CompletableFuture.completedFuture(true); + } else { + return checkPermission(destination, role, action) + .thenApply(isPermission -> isPermission && checkCluster(destination)); + } } private boolean checkCluster(DestinationName destination) { @@ -98,38 +121,49 @@ private boolean checkCluster(DestinationName destination) { } } - public boolean checkPermission(DestinationName destination, String role, AuthAction action) { + public CompletableFuture checkPermission(DestinationName destination, String role, + AuthAction action) { + CompletableFuture permissionFuture = new CompletableFuture<>(); try { - Optional policies = configCache.policiesCache().get(POLICY_ROOT + destination.getNamespace()); - if (!policies.isPresent()) { - if (log.isDebugEnabled()) { - log.debug("Policies node couldn't be found for destination : {}", destination); + configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> { + if (!policies.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Policies node couldn't be found for destination : {}", destination); + } + permissionFuture.complete(false); + } else { + Set namespaceActions = policies.get().auth_policies.namespace_auth.get(role); + if (namespaceActions != null && namespaceActions.contains(action)) { + // The role has namespace level permission + permissionFuture.complete(true); + } else { + Map> roles = policies.get().auth_policies.destination_auth + .get(destination.toString()); + if (roles == null) { + // Destination has no custom policy + permissionFuture.complete(false); + } else { + Set resourceActions = roles.get(role); + if (resourceActions != null && resourceActions.contains(action)) { + // The role has destination level permission + permissionFuture.complete(true); + } else { + permissionFuture.complete(false); + } + } + } } - return false; - } - - Set namespaceActions = policies.get().auth_policies.namespace_auth.get(role); - if (namespaceActions != null && namespaceActions.contains(action)) { - // The role has namespace level permission - return true; - } - - Map> roles = policies.get().auth_policies.destination_auth.get(destination.toString()); - if (roles == null) { - // Destination has no custom policy - return false; - } - - Set resourceActions = roles.get(role); - if (resourceActions != null && resourceActions.contains(action)) { - // The role has destination level permission - return true; - } - return false; + }).exceptionally(ex -> { + log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, + ex); + permissionFuture.complete(false); + return null; + }); } catch (Exception e) { log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e); - return false; + permissionFuture.complete(false); } + return permissionFuture; } /** diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index da185f03a2298..5c737c4ed6993 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -181,208 +181,223 @@ protected void handleConnect(CommandConnect connect) { @Override protected void handleSubscribe(final CommandSubscribe subscribe) { checkArgument(state == State.Connected); + CompletableFuture authorizationFuture; if (service.isAuthorizationEnabled()) { - if (service.getAuthorizationManager().canConsume(DestinationName.get(subscribe.getTopic()), authRole)) { + authorizationFuture = service.getAuthorizationManager() + .canConsumeAsync(DestinationName.get(subscribe.getTopic()), authRole); + } else { + authorizationFuture = CompletableFuture.completedFuture(true); + } + authorizationFuture.thenApply(isAuthorized -> { + if (isAuthorized) { if (log.isDebugEnabled()) { log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole); } + final String topicName = subscribe.getTopic(); + final String subscriptionName = subscribe.getSubscription(); + final long requestId = subscribe.getRequestId(); + final long consumerId = subscribe.getConsumerId(); + final SubType subType = subscribe.getSubType(); + final String consumerName = subscribe.getConsumerName(); + + log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); + + CompletableFuture consumerFuture = new CompletableFuture<>(); + CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); + + if (existingConsumerFuture != null) { + if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { + Consumer consumer = existingConsumerFuture.getNow(null); + log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer); + ctx.writeAndFlush(Commands.newSuccess(requestId)); + return null; + } else { + // There was an early request to create a consumer with same consumerId. This can happen when + // client timeout is lower the broker timeouts. We need to wait until the previous consumer + // creation request either complete or fails. + log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName, + subscriptionName); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, + "Consumer is already present on the connection")); + return null; + } + } + + service.getTopic(topicName).thenCompose( + topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, consumerName)) + .thenAccept(consumer -> { + if (consumerFuture.complete(consumer)) { + log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, + subscriptionName); + ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise()); + } else { + // The consumer future was completed before by a close command + try { + consumer.close(); + log.info("[{}] Cleared consumer created after timeout on client side {}", + remoteAddress, consumer); + } catch (BrokerServiceException e) { + log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", + remoteAddress, consumer, e.getMessage()); + } + consumers.remove(consumerId, consumerFuture); + } + + }) // + .exceptionally(exception -> { + log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, + subscriptionName, exception.getCause().getMessage()); + + // If client timed out, the future would have been completed by subsequent close. Send error + // back to + // client, only if not completed already. + if (consumerFuture.completeExceptionally(exception)) { + ctx.writeAndFlush(Commands.newError(requestId, + BrokerServiceException.getClientErrorCode(exception.getCause()), + exception.getCause().getMessage())); + } + consumers.remove(consumerId, consumerFuture); + + return null; + + }); } else { String msg = "Client is not authorized to subscribe"; log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, msg)); - return; - } - } - - final String topicName = subscribe.getTopic(); - final String subscriptionName = subscribe.getSubscription(); - final long requestId = subscribe.getRequestId(); - final long consumerId = subscribe.getConsumerId(); - final SubType subType = subscribe.getSubType(); - final String consumerName = subscribe.getConsumerName(); - - log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); - - CompletableFuture consumerFuture = new CompletableFuture<>(); - CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture); - - if (existingConsumerFuture != null) { - if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { - Consumer consumer = existingConsumerFuture.getNow(null); - log.info("[{}] Consumer with the same id is already created: {}", remoteAddress, consumer); - ctx.writeAndFlush(Commands.newSuccess(requestId)); - return; - } else { - // There was an early request to create a consumer with same consumerId. This can happen when client - // timeout - // is lower the broker timeouts. We need to wait until the previous consumer creation request either - // complete or fails. - log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName, - subscriptionName); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, - "Consumer is already present on the connection")); - return; } - } - - service.getTopic(topicName) - .thenCompose( - topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, consumerName)) - .thenAccept(consumer -> { - if (consumerFuture.complete(consumer)) { - log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, - subscriptionName); - ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise()); - } else { - // The consumer future was completed before by a close command - try { - consumer.close(); - log.info("[{}] Cleared consumer created after timeout on client side {}", remoteAddress, - consumer); - } catch (BrokerServiceException e) { - log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", - remoteAddress, consumer, e.getMessage()); - } - consumers.remove(consumerId, consumerFuture); - } - - }) // - .exceptionally(exception -> { - log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, subscriptionName, - exception.getCause().getMessage()); - - // If client timed out, the future would have been completed by subsequent close. Send error back to - // client, only if not completed already. - if (consumerFuture.completeExceptionally(exception)) { - ctx.writeAndFlush(Commands.newError(requestId, - BrokerServiceException.getClientErrorCode(exception.getCause()), - exception.getCause().getMessage())); - } - consumers.remove(consumerId, consumerFuture); - - return null; - - }); + return null; + }); } @Override protected void handleProducer(final CommandProducer cmdProducer) { checkArgument(state == State.Connected); + CompletableFuture authorizationFuture; if (service.isAuthorizationEnabled()) { - if (service.getAuthorizationManager().canProduce(DestinationName.get(cmdProducer.getTopic().toString()), - authRole)) { + authorizationFuture = service.getAuthorizationManager() + .canProduceAsync(DestinationName.get(cmdProducer.getTopic().toString()), authRole); + } else { + authorizationFuture = CompletableFuture.completedFuture(true); + } + authorizationFuture.thenApply(isAuthorized -> { + if (isAuthorized) { if (log.isDebugEnabled()) { log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole); } - } else { - String msg = "Client is not authorized to Produce"; - log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); - ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg)); - return; - } - } - - final String producerName; - if (cmdProducer.hasProducerName()) { - // Use producer name provided by client - producerName = cmdProducer.getProducerName(); - } else { - // Need to generate a unique id - producerName = service.generateUniqueProducerName(); - } + final String producerName; + if (cmdProducer.hasProducerName()) { + // Use producer name provided by client + producerName = cmdProducer.getProducerName(); + } else { + // Need to generate a unique id + producerName = service.generateUniqueProducerName(); + } - final String topicName = cmdProducer.getTopic(); - final long producerId = cmdProducer.getProducerId(); - final long requestId = cmdProducer.getRequestId(); + final String topicName = cmdProducer.getTopic(); + final long producerId = cmdProducer.getProducerId(); + final long requestId = cmdProducer.getRequestId(); - CompletableFuture producerFuture = new CompletableFuture<>(); + CompletableFuture producerFuture = new CompletableFuture<>(); - CompletableFuture existingProducerFuture = producers.putIfAbsent(producerId, producerFuture); + CompletableFuture existingProducerFuture = producers.putIfAbsent(producerId, producerFuture); - if (existingProducerFuture != null) { - if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { - Producer producer = existingProducerFuture.getNow(null); - log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer); - ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName())); - return; - } else { - // There was an early request to create a producer with same producerId. This can happen when client - // timeout is lower the broker timeouts. We need to wait until the previous producer creation request - // either complete or fails. - log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, - "Producer is already present on the connection")); - return; - } - } - - log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - - service.getTopic(topicName).thenAccept((Topic topic) -> { - // Before creating producer, check if backlog quota exceeded on topic - if (topic.isBacklogQuotaExceeded(producerName)) { - IllegalStateException illegalStateException = new IllegalStateException( - "Cannot create producer on topic with backlog quota exceeded"); - BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); - if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { - ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededError, - illegalStateException.getMessage())); - } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { - ctx.writeAndFlush(Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException, - illegalStateException.getMessage())); + if (existingProducerFuture != null) { + if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) { + Producer producer = existingProducerFuture.getNow(null); + log.info("[{}] Producer with the same id is already created: {}", remoteAddress, producer); + ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName())); + return null; + } else { + // There was an early request to create a producer with same producerId. This can happen when + // client + // timeout is lower the broker timeouts. We need to wait until the previous producer creation + // request + // either complete or fails. + log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, + "Producer is already present on the connection")); + return null; + } } - producerFuture.completeExceptionally(illegalStateException); - producers.remove(producerId, producerFuture); - return; - } - - disableTcpNoDelayIfNeeded(topicName, producerName); - Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole); + log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); + + service.getTopic(topicName).thenAccept((Topic topic) -> { + // Before creating producer, check if backlog quota exceeded on topic + if (topic.isBacklogQuotaExceeded(producerName)) { + IllegalStateException illegalStateException = new IllegalStateException( + "Cannot create producer on topic with backlog quota exceeded"); + BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); + if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { + ctx.writeAndFlush(Commands.newError(requestId, + ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage())); + } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { + ctx.writeAndFlush( + Commands.newError(requestId, ServerError.ProducerBlockedQuotaExceededException, + illegalStateException.getMessage())); + } + producerFuture.completeExceptionally(illegalStateException); + producers.remove(producerId, producerFuture); + return; + } - try { - topic.addProducer(producer); + disableTcpNoDelayIfNeeded(topicName, producerName); + + Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole); + + try { + topic.addProducer(producer); + + if (isActive()) { + if (producerFuture.complete(producer)) { + log.info("[{}] Created new producer: {}", remoteAddress, producer); + ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName)); + return; + } else { + // The producer's future was completed before by a close command + producer.closeNow(); + log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress, + producer); + } + } else { + producer.closeNow(); + log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress, + producer); + producerFuture.completeExceptionally( + new IllegalStateException("Producer created after connection was closed")); + } + } catch (BrokerServiceException ise) { + log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, + ise.getMessage()); + ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise), + ise.getMessage())); + producerFuture.completeExceptionally(ise); + } - if (isActive()) { - if (producerFuture.complete(producer)) { - log.info("[{}] Created new producer: {}", remoteAddress, producer); - ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName)); - return; - } else { - // The producer's future was completed before by a close command - producer.closeNow(); - log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress, - producer); + producers.remove(producerId, producerFuture); + }).exceptionally(exception -> { + Throwable cause = exception.getCause(); + if (!(cause instanceof ServiceUnitNotReadyException)) { + // Do not print stack traces for expected exceptions + log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception); } - } else { - producer.closeNow(); - log.info("[{}] Cleared producer created after connection was closed: {}", remoteAddress, producer); - producerFuture.completeExceptionally( - new IllegalStateException("Producer created after connection was closed")); - } - } catch (BrokerServiceException ise) { - log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName, ise.getMessage()); - ctx.writeAndFlush( - Commands.newError(requestId, BrokerServiceException.getClientErrorCode(ise), ise.getMessage())); - producerFuture.completeExceptionally(ise); - } - producers.remove(producerId, producerFuture); - }).exceptionally(exception -> { - Throwable cause = exception.getCause(); - if (!(cause instanceof ServiceUnitNotReadyException)) { - // Do not print stack traces for expected exceptions - log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception); - } + // If client timed out, the future would have been completed by subsequent close. Send error back to + // client, only if not completed already. + if (producerFuture.completeExceptionally(exception)) { + ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause), + cause.getMessage())); + } + producers.remove(producerId, producerFuture); - // If client timed out, the future would have been completed by subsequent close. Send error back to - // client, only if not completed already. - if (producerFuture.completeExceptionally(exception)) { - ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause), - cause.getMessage())); + return null; + }); + } else { + String msg = "Client is not authorized to Produce"; + log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); + ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg)); } - producers.remove(producerId, producerFuture); - return null; }); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 188c0a16f7880..9644f19389338 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -381,7 +381,7 @@ public void testProducerOnNotOwnedTopic() throws Exception { @Test(timeOut = 30000) public void testProducerCommandWithAuthorizationPositive() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(true).when(authorizationManager).canProduce(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); resetChannel(); @@ -409,7 +409,7 @@ public void testNonExistentTopic() throws Exception { ConfigurationCacheService configCacheService = mock(ConfigurationCacheService.class); doReturn(configCacheService).when(pulsar).getConfigurationCache(); doReturn(zkDataCache).when(configCacheService).policiesCache(); - doThrow(new NoNodeException()).when(zkDataCache).get(matches(".*nonexistent.*")); + doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(matches(".*nonexistent.*")); AuthorizationManager authorizationManager = spy(new AuthorizationManager(svcConfig, configCacheService)); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); @@ -440,7 +440,7 @@ public void testClusterAccess() throws Exception { doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthorizationEnabled(); doReturn(false).when(authorizationManager).isSuperUser(Mockito.anyString()); - doReturn(true).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(), + doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).checkPermission(any(DestinationName.class), Mockito.anyString(), any(AuthAction.class)); resetChannel(); @@ -493,7 +493,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { public void testProducerCommandWithAuthorizationNegative() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(false).when(authorizationManager).canProduce(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canProduceAsync(Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -1022,7 +1022,7 @@ public void testSubscribeCommand() throws Exception { @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationPositive() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(true).when(authorizationManager).canConsume(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(true)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); @@ -1042,7 +1042,7 @@ public void testSubscribeCommandWithAuthorizationPositive() throws Exception { @Test(timeOut = 30000) public void testSubscribeCommandWithAuthorizationNegative() throws Exception { AuthorizationManager authorizationManager = mock(AuthorizationManager.class); - doReturn(false).when(authorizationManager).canConsume(Mockito.any(), Mockito.any()); + doReturn(CompletableFuture.completedFuture(false)).when(authorizationManager).canConsumeAsync(Mockito.any(), Mockito.any()); doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java index 8cf47ccfdecfc..e34f20bad2211 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/AbstractWebSocketHandler.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; import javax.servlet.http.HttpServletRequest; @@ -73,11 +74,16 @@ public void onWebSocketConnect(Session session) { } } - if (service.isAuthorizationEnabled() && !isAuthorized(authRole)) { - log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole, - topic); - close(WebSocketError.NotAuthorizedError); - return; + if (service.isAuthorizationEnabled()) { + final String role = authRole; + isAuthorized(authRole).thenApply(isAuthorized -> { + if(!isAuthorized) { + log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role, + topic); + close(WebSocketError.NotAuthorizedError); + } + return null; + }); } } @@ -120,7 +126,7 @@ protected String checkAuthentication() { return null; } - protected abstract boolean isAuthorized(String authRole); + protected abstract CompletableFuture isAuthorized(String authRole); private String extractTopicName(HttpServletRequest request) { String uri = request.getRequestURI(); diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java index a280ce36149c0..2cc7ed1e5fdff 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ConsumerHandler.java @@ -24,6 +24,7 @@ import java.time.format.DateTimeFormatter; import java.util.Base64; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -177,8 +178,8 @@ private ConsumerConfiguration getConsumerConfiguration() { } @Override - protected boolean isAuthorized(String authRole) { - return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); + protected CompletableFuture isAuthorized(String authRole) { + return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole); } private static String extractSubscription(HttpServletRequest request) { diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java index ef90274e691da..311fc39b8cc17 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/ProducerHandler.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Base64; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; @@ -118,8 +119,8 @@ public void onWebSocketText(String message) { }); } - protected boolean isAuthorized(String authRole) { - return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole); + protected CompletableFuture isAuthorized(String authRole) { + return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); } private ProducerConfiguration getProducerConfiguration() {