Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.9][fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService #16083

Merged
merged 2 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -396,11 +397,15 @@ public boolean allowTenantOperation(String tenantName,
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
tenantName, operation, originalRole, role, authData).get();
tenantName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -521,11 +526,15 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
AuthenticationDataSource authData) {
try {
return allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, originalRole, role, authData).get();
namespaceName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -585,11 +594,15 @@ public Boolean allowTopicPolicyOperation(TopicName topicName,
AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(
topicName, policy, operation, originalRole, role, authData).get();
topicName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -667,9 +680,10 @@ public Boolean allowTopicOperation(TopicName topicName,
TopicOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
throw new RestException(e);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected void validateAdminAndClientPermission() {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception ve) {
try {
checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
checkAuthorizationAsync(pulsar(), topicName, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
Expand Down Expand Up @@ -3559,46 +3559,55 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
throw ex;
}
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});

// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -219,23 +220,14 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId, false));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
authException.getMessage(), requestId));
return;
} catch (Exception e) {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
validationFuture.completeExceptionally(e);
return;
}
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
Expand All @@ -247,21 +239,36 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured", requestId));
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));

}).exceptionally(ex -> {
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
return null;
});
})
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
throwable.getMessage(), requestId));
} else {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
validationFuture.completeExceptionally(throwable);
}
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});

Expand Down
Loading