Skip to content

Commit

Permalink
feat(flows): add application flows
Browse files Browse the repository at this point in the history
  • Loading branch information
tcompiegne committed Jan 26, 2021
1 parent 911cf33 commit 6d4e8c4
Show file tree
Hide file tree
Showing 44 changed files with 1,408 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.gravitee.am.common.policy.ExtensionPoint;
import io.gravitee.am.gateway.policy.Policy;
import io.gravitee.am.model.oidc.Client;
import io.gravitee.common.service.Service;
import io.reactivex.Single;

Expand All @@ -28,5 +29,9 @@
*/
public interface FlowManager extends Service {

Single<List<Policy>> findByExtensionPoint(ExtensionPoint extensionPoint);
Single<List<Policy>> findByExtensionPoint(ExtensionPoint extensionPoint, Client client);

default Single<List<Policy>> findByExtensionPoint(ExtensionPoint extensionPoint) {
return findByExtensionPoint(extensionPoint, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.gravitee.am.model.flow.Flow;
import io.gravitee.am.model.flow.Step;
import io.gravitee.am.model.flow.Type;
import io.gravitee.am.model.oidc.Client;
import io.gravitee.am.plugins.policy.core.PolicyPluginManager;
import io.gravitee.am.service.FlowService;
import io.gravitee.common.event.Event;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* @author Titouan COMPIEGNE (titouan.compiegne at graviteesource.com)
Expand Down Expand Up @@ -72,7 +74,7 @@ public class FlowManagerImpl extends AbstractService implements FlowManager, Ini
private EventManager eventManager;

private ConcurrentMap<String, Flow> flows = new ConcurrentHashMap<>();
private ConcurrentMap<ExtensionPoint, List<Policy>> policies = new ConcurrentHashMap<>();
private ConcurrentMap<ExtensionPoint, Set<ExecutionFlow>> policies = new ConcurrentHashMap<>();

@Override
public void afterPropertiesSet() {
Expand Down Expand Up @@ -113,8 +115,34 @@ public void onEvent(Event<FlowEvent, Payload> event) {
}

@Override
public Single<List<Policy>> findByExtensionPoint(ExtensionPoint extensionPoint) {
return Single.just(policies.getOrDefault(extensionPoint, Collections.emptyList()));
public Single<List<Policy>> findByExtensionPoint(ExtensionPoint extensionPoint, Client client) {
Set<ExecutionFlow> executionFlows = policies.get(extensionPoint);
// if no flow, returns empty list
if (executionFlows == null) {
return Single.just(Collections.emptyList());
}

// get domain policies
List<Policy> domainExecutionPolicies = getExecutionPolicies(executionFlows, client, true);

// if client is null, executes only security domain flows
if (client == null) {
return Single.just(domainExecutionPolicies);
}

// get application policies
List<Policy> applicationExecutionPolicies = getExecutionPolicies(executionFlows, client, false);

// if client does not inherit domain flows, executes only application flows
if (!client.isFlowsInherited()) {
return Single.just(applicationExecutionPolicies);
}

return Single.just(
Stream.concat(
domainExecutionPolicies.stream(),
applicationExecutionPolicies.stream()
).collect(Collectors.toList()));
}

private void updateFlow(String flowId, FlowEvent flowEvent) {
Expand All @@ -123,8 +151,8 @@ private void updateFlow(String flowId, FlowEvent flowEvent) {
flowService.findById(flowId)
.subscribe(
flow -> {
flows.put(flow.getId(), flow);
loadFlow(flow);
flows.put(flow.getId(), flow);
logger.info("Flow {} has been deployed for domain {}", flowId, domain.getName());
},
error -> logger.error("Unable to deploy flow {} for domain {}", flowId, domain.getName(), error),
Expand All @@ -134,10 +162,9 @@ private void updateFlow(String flowId, FlowEvent flowEvent) {
private void removeFlow(String flowId) {
logger.info("Domain {} has received flow event, delete flow {}", domain.getName(), flowId);
Flow deletedFlow = flows.remove(flowId);
extensionPoints.get(deletedFlow.getType()).forEach(extensionPoint -> {
policies.remove(extensionPoint);
flows.remove(flowId);
});
if (deletedFlow != null) {
extensionPoints.get(deletedFlow.getType()).forEach(extensionPoint -> removeExecutionFlow(extensionPoint, deletedFlow.getId()));
}
}

private void loadFlows() {
Expand All @@ -160,7 +187,7 @@ private void loadFlows() {
private void loadFlow(Flow flow) {
if (!flow.isEnabled()) {
logger.debug("Flow {} is disabled, skip process", flow.getId());
extensionPoints.get(flow.getType()).forEach(extensionPoint -> policies.put(extensionPoint, Collections.emptyList()));
extensionPoints.get(flow.getType()).forEach(extensionPoint -> removeExecutionFlow(extensionPoint, flow.getId()));
return;
}

Expand All @@ -181,19 +208,19 @@ private void loadFlow(Flow flow) {
switch (flow.getType()) {
case ROOT:
// for root type, fetch only the pre step policies
policies.put(ExtensionPoint.ROOT, prePolicies);
addExecutionFlow(ExtensionPoint.ROOT, flow, prePolicies);
break;
case CONSENT:
policies.put(ExtensionPoint.PRE_CONSENT, prePolicies);
policies.put(ExtensionPoint.POST_CONSENT, postPolicies);
addExecutionFlow(ExtensionPoint.PRE_CONSENT, flow, prePolicies);
addExecutionFlow(ExtensionPoint.POST_CONSENT, flow, postPolicies);
break;
case LOGIN:
policies.put(ExtensionPoint.PRE_LOGIN, prePolicies);
policies.put(ExtensionPoint.POST_LOGIN, postPolicies);
addExecutionFlow(ExtensionPoint.PRE_LOGIN, flow, prePolicies);
addExecutionFlow(ExtensionPoint.POST_LOGIN, flow, postPolicies);
break;
case REGISTER:
policies.put(ExtensionPoint.PRE_REGISTER, prePolicies);
policies.put(ExtensionPoint.POST_REGISTER, postPolicies);
addExecutionFlow(ExtensionPoint.PRE_REGISTER, flow, prePolicies);
addExecutionFlow(ExtensionPoint.POST_REGISTER, flow, postPolicies);
break;
default:
throw new IllegalArgumentException("No suitable flow type found for : " + flow.getType());
Expand All @@ -210,5 +237,72 @@ private Policy createPolicy(Step step) {
return null;
}
}

private void addExecutionFlow(ExtensionPoint extensionPoint, Flow flow, List<Policy> executionPolicies) {
Set<ExecutionFlow> existingFlows = policies.get(extensionPoint);
if (existingFlows == null) {
existingFlows = new HashSet<>();
}
ExecutionFlow executionFlow = new ExecutionFlow(flow, executionPolicies);
existingFlows.remove(executionFlow);
existingFlows.add(executionFlow);
policies.put(extensionPoint, existingFlows);
}

private void removeExecutionFlow(ExtensionPoint extensionPoint, String flowId) {
Set<ExecutionFlow> existingFlows = policies.get(extensionPoint);
if (existingFlows == null || existingFlows.isEmpty()) {
return;
}
existingFlows.removeIf(executionFlow -> flowId.equals(executionFlow.getFlowId()));
}

private List<Policy> getExecutionPolicies(Set<ExecutionFlow> executionFlows,
Client client,
boolean excludeApps) {
return executionFlows.stream()
.filter(executionFlow -> (excludeApps) ? executionFlow.getApplication() == null : client.getId().equals(executionFlow.getApplication()))
.map(ExecutionFlow::getPolicies)
.filter(executionPolicies -> executionPolicies != null)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private class ExecutionFlow {
private String flowId;
private List<Policy> policies;
private String application;

public ExecutionFlow(Flow flow, List<Policy> policies) {
this.flowId = flow.getId();
this.policies = policies;
this.application = flow.getApplication();
}

public String getFlowId() {
return flowId;
}

public List<Policy> getPolicies() {
return policies;
}

public String getApplication() {
return application;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ExecutionFlow that = (ExecutionFlow) o;
return Objects.equals(flowId, that.flowId);
}

@Override
public int hashCode() {
return Objects.hash(flowId);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void handle(RoutingContext context) {
}

// resolve policies
resolve(extensionPoint, handler -> {
resolve(context, handler -> {
if (handler.failed()) {
logger.error("An error occurs while resolving policies", handler.cause());
context.fail(handler.cause());
Expand Down Expand Up @@ -117,8 +117,8 @@ public void handle(RoutingContext context) {
});
}

private void resolve(ExtensionPoint extensionPoint, Handler<AsyncResult<List<Policy>>> handler) {
flowManager.findByExtensionPoint(extensionPoint)
private void resolve(RoutingContext routingContext, Handler<AsyncResult<List<Policy>>> handler) {
flowManager.findByExtensionPoint(extensionPoint, routingContext.get(ConstantKeys.CLIENT_CONTEXT_KEY))
.subscribe(
policies -> handler.handle(Future.succeededFuture(policies)),
error -> handler.handle(Future.failedFuture(error)));
Expand Down
Loading

0 comments on commit 6d4e8c4

Please sign in to comment.