Skip to content

Commit

Permalink
[branch-2.8][fix][security] Add timeout of sync methods and avoid cal…
Browse files Browse the repository at this point in the history
…l sync method for AuthoriationService (#15694)

(cherry picked from commit 6af365e)

Besides resolving the basic conflicts, this PR
- migrate `validateAdminAccessForTenantAsync` from #14149
- migrate `TenantResources#getTenantAsync` from #11693
  • Loading branch information
codelipenghui authored and BewareMyPower committed Jul 28, 2022
1 parent c817873 commit 5bb38a8
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -42,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -394,11 +394,15 @@ public boolean allowTenantOperation(String tenantName,
AuthenticationDataSource authData) {
try {
return allowTenantOperationAsync(
tenantName, operation, originalRole, role, authData).get();
tenantName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -519,11 +523,15 @@ public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
AuthenticationDataSource authData) {
try {
return allowNamespacePolicyOperationAsync(
namespaceName, policy, operation, originalRole, role, authData).get();
namespaceName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -583,11 +591,15 @@ public Boolean allowTopicPolicyOperation(TopicName topicName,
AuthenticationDataSource authData) {
try {
return allowTopicPolicyOperationAsync(
topicName, policy, operation, originalRole, role, authData).get();
topicName, policy, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}

Expand Down Expand Up @@ -665,13 +677,17 @@ public Boolean allowTopicOperation(TopicName topicName,
TopicOperation operation,
String originalRole,
String role,
AuthenticationDataSource authData) {
AuthenticationDataSource authData) throws Exception {
try {
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get();
return allowTopicOperationAsync(topicName, operation, originalRole, role, authData).get(
conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RestException(e);
} catch (ExecutionException e) {
throw new RestException(e.getCause());
} catch (TimeoutException e) {
throw new RestException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,6 +39,8 @@
*/
public class BaseResources<T> {

protected static final String BASE_POLICIES_PATH = "/admin/policies";

@Getter
private final MetadataStoreExtended store;
@Getter
Expand Down Expand Up @@ -164,4 +167,10 @@ public int getOperationTimeoutSec() {
public CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}
}

protected static String joinPath(String... parts) {
StringBuilder sb = new StringBuilder();
Joiner.on('/').appendTo(sb, parts);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/
package org.apache.pulsar.broker.resources;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

public class TenantResources extends BaseResources<TenantInfo> {
public TenantResources(MetadataStoreExtended store, int operationTimeoutSec) {
super(store, TenantInfo.class, operationTimeoutSec);
}

public CompletableFuture<Optional<TenantInfo>> getTenantAsync(String tenantName) {
return getAsync(joinPath(BASE_POLICIES_PATH, tenantName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,6 @@ protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
}
}

protected void validateAdminAndClientPermission() {
try {
validateAdminAccessForTenant(topicName.getTenant());
} catch (Exception ve) {
try {
checkAuthorization(pulsar(), topicName, clientAppId(), clientAuthData());
} catch (RestException re) {
throw re;
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing request. topic={}, role={}. Error: {}",
topicName, clientAppId(), e.getMessage(), e);
throw new RestException(e);
}
}
}

public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
Expand Down Expand Up @@ -3446,46 +3429,55 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
PulsarService pulsar, String clientAppId, String originalPrincipal,
AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, topicName, clientAppId, authenticationData);
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, topicName.toString());
throw new PulsarClientException(String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName.toString(), authException.getMessage()));
}
} catch (Exception ex) {
// throw without wrapping to PulsarClientException that considers: unknown error marked as internal
// server error
log.warn("Failed to authorize {} on cluster {} with unexpected exception {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
throw ex;
}
CompletableFuture<Void> authorizationFuture = new CompletableFuture<>();
checkAuthorizationAsync(pulsar, topicName, clientAppId, authenticationData)
.thenRun(() -> authorizationFuture.complete(null))
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
validateAdminAccessForTenantAsync(pulsar,
clientAppId, originalPrincipal, topicName.getTenant(), authenticationData)
.thenRun(() -> {
authorizationFuture.complete(null);
}).exceptionally(ex -> {
Throwable throwable2 = FutureUtil.unwrapCompletionException(ex);
if (throwable2 instanceof RestException) {
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName);
authorizationFuture.completeExceptionally(new PulsarClientException(
String.format("Authorization failed %s on topic %s with error %s",
clientAppId, topicName, throwable2.getMessage())));
} else {
authorizationFuture.completeExceptionally(throwable2);
}
return null;
});
} else {
// throw without wrapping to PulsarClientException that considers: unknown error marked as
// internal server error
log.warn("Failed to authorize {} on topic {}", clientAppId, topicName, throwable);
authorizationFuture.completeExceptionally(throwable);
}
return null;
});

// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
})
.exceptionally(e -> {
metadataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});
return metadataFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -217,24 +218,14 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId, false));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
} catch (RestException authException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
authException.getMessage(), requestId));
return;
} catch (Exception e) {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
validationFuture.completeExceptionally(e);
return;
}
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
.thenAccept(peerClusterData -> {
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
Expand All @@ -245,21 +236,36 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured", requestId));
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId,
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));

}).exceptionally(ex -> {
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
validationFuture.complete(
newLookupErrorResponse(ServerError.MetadataError,
FutureUtil.unwrapCompletionException(ex).getMessage(), requestId));
return null;
});
})
.exceptionally(e -> {
Throwable throwable = FutureUtil.unwrapCompletionException(e);
if (throwable instanceof RestException) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName);
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
throwable.getMessage(), requestId));
} else {
log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName);
validationFuture.completeExceptionally(throwable);
}
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
validationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return null;
});

Expand Down
Loading

0 comments on commit 5bb38a8

Please sign in to comment.