Skip to content

Commit

Permalink
[branch-2.9][fix][security] Add timeout of sync methods and avoid cal…
Browse files Browse the repository at this point in the history
…l sync method for AuthoriationService (#16083)
  • Loading branch information
mattisonchao authored Jun 17, 2022
1 parent 3e84452 commit 1fa9c2e
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -396,11 +397,15 @@ public boolean allowTenantOperation(String tenantName,
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
tenantName, operation, originalRole, role, authData).get();
tenantName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -521,11 +526,15 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
AuthenticationDataSource authData) {
try {
return allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, originalRole, role, authData).get();
namespaceName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -585,11 +594,15 @@ public Boolean allowTopicPolicyOperation(TopicName topicName,
AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(
topicName, policy, operation, originalRole, role, authData).get();
topicName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -667,9 +680,10 @@ public Boolean allowTopicOperation(TopicName topicName,
TopicOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected void validateAdminAndClientPermission() {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception ve) {
try {
checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
checkAuthorizationAsync(pulsar(), topicName, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
Expand Down Expand Up @@ -3559,46 +3559,55 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
throw ex;
}
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});

// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -219,23 +220,14 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId, false));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
authException.getMessage(), requestId));
return;
} catch (Exception e) {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
validationFuture.completeExceptionally(e);
return;
}
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
Expand All @@ -247,21 +239,36 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured", requestId));
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));

}).exceptionally(ex -> {
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
return null;
});
})
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
throwable.getMessage(), requestId));
} else {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
validationFuture.completeExceptionally(throwable);
}
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});

Expand Down
Loading

0 comments on commit 1fa9c2e

Please sign in to comment.