Skip to content

Commit

Permalink
Added unit tests and fixed things pointed out in the code review.
Browse files Browse the repository at this point in the history
Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
  • Loading branch information
Stanchev Aleksandar committed Mar 7, 2023
1 parent 77b1c03 commit 01737fb
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,12 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
/**
* Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor.
*
* @param error the error
* @param enforcedSignal the ditto headers from the initial command
* @param enforcedCommand the enforced initial command
* @param throwable the throwable
* @return a new {@link java.util.concurrent.CompletionStage} failed with the initial throwable.
*/
protected CompletionStage<Object> handleTargetActorException(final Throwable error,
final Signal<?> enforcedSignal) {
return CompletableFuture.failedFuture(error);
protected CompletionStage<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
return CompletableFuture.failedFuture(throwable);
}

/**
Expand Down Expand Up @@ -828,8 +828,9 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
"forwarding to target actor, telling sender: {}", dre);
sender.tell(dre, getSelf());
} else if (response instanceof Status.Success success) {
log.debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
log.withCorrelationId(signal).debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
} else if (null != response) {
log.withCorrelationId(signal).debug("Sending response: <{}> back to sender: <{}>", response, sender.path());
sender.tell(response, getSelf());
} else {
log.withCorrelationId(signal)
Expand Down Expand Up @@ -903,6 +904,10 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s
dittoHeaders = tracedSignal.getDittoHeaders();
}
return enforcerResponseToTargetActor(dittoHeaders, enforcedCommand, sender)
.exceptionallyCompose(error -> handleTargetActorException(enforcedCommand, error)
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null)))
// HandleTargetActorException will always return failed future thenApply only
// for compilation
.whenComplete((result, error) -> {
startedSpan.mark("processed");
stopTimer(processingTimer).accept(result, error);
Expand Down Expand Up @@ -980,14 +985,7 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
).exceptionallyCompose(error -> {
log.withCorrelationId(enforcedSignal)
.error(error, "Unexpected target actor response error!");
return handleTargetActorException(error, enforcedSignal)
// Would never gon in apply as handleTargetActorException will always return failed future
// Added only for compilation
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null));
});
);
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,8 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final
final CreateThing createThing) {

if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) {
getContext().getParent().tell(
ThingPolicyCreated.of(createThing.getEntityId(), createPolicyResponse.getEntityId(),
createPolicyResponse.getDittoHeaders()), getSelf());
getContext().getParent().tell(new ThingPolicyCreated(createThing.getEntityId(),
createPolicyResponse.getEntityId(), createPolicy.getDittoHeaders()), getSelf());
return createPolicyResponse.getPolicyCreated().orElseThrow();
} else {
if (shouldReportInitialPolicyCreationFailure(policyResponse)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;


public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {

public static ThingPolicyCreated of(final ThingId thingId, final PolicyId policyId, final DittoHeaders dittoHeaders) {
return new ThingPolicyCreated(thingId, policyId, dittoHeaders);
}

}
/**
* Used by the {@link org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor} to notify the
* ThingSupervisorActor that a policy was created in result of ThingCreate enforcement.
* @param thingId thingId of the thing for which policy is created
* @param policyId the policyId of the created policy
* @param dittoHeaders dittoHeaders containing the correlationId of the initial command
*/
public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,20 @@
*/
package org.eclipse.ditto.things.service.persistence.actors;


import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand Down Expand Up @@ -60,7 +63,6 @@
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.things.service.enforcement.RollbackCreatedPolicy;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
Expand Down Expand Up @@ -318,9 +320,9 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse)
.map(Object.class::cast);
} else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) {
CompletableFuture<Object> responseF = new CompletableFuture<>();
final CompletableFuture<Object> responseF = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf());
return Source.fromCompletionStage(responseF);
return Source.completionStage(responseF);
} else {
return Source.single(pair.response());
}
Expand All @@ -330,20 +332,29 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
}

@Override
protected CompletableFuture<Object> handleTargetActorException(final Throwable error, final Signal<?> enforcedSignal) {
if (enforcedSignal instanceof CreateThing createThing) {
CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(createThing, error, responseFuture), getSelf());
protected CompletableFuture<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
if (RollbackCreatedPolicy.shouldRollback(enforcedCommand, throwable) &&
enforcedCommand instanceof Signal<?> signal) {
log.withCorrelationId(signal)
.warning("Target actor exception received. Sending RollbackCreatedPolicy msg to self.");
final CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf());
return responseFuture;
} else {
log.warning(throwable, "Target actor exception received.");
return CompletableFuture.failedFuture(throwable);
}
return CompletableFuture.failedFuture(error);
}

private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
if (policyCreatedEvent != null) {
DittoHeaders dittoHeaders = rollback.initialCommand().getDittoHeaders();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true").build());
final String correlationId = rollback.initialCommand().getDittoHeaders().getCorrelationId()
.orElse("unexpected:" + UUID.randomUUID());
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.correlationId(correlationId)
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders);
AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy,
enforcementConfig.getAskWithRetryConfig(),
getContext().system(), response -> {
Expand All @@ -357,6 +368,7 @@ private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback)
} else {
rollback.completeInitialResponse();
}
policyCreatedEvent = null;
}

@Override
Expand Down Expand Up @@ -421,10 +433,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor)
.match(ThingPolicyCreated.class, event -> {
this.policyCreatedEvent = event;
log.withCorrelationId(event.dittoHeaders()).info("Policy <{}> created", event.policyId());
}).match(RollbackCreatedPolicy.class, this::handlerRollbackCreatedPolicy)
.match(ThingPolicyCreated.class, msg -> {
log.withCorrelationId(msg.dittoHeaders()).info("ThingPolicyCreated msg received: <{}>", msg.policyId());
this.policyCreatedEvent = msg;
}).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}
Expand All @@ -441,4 +453,58 @@ private enum Control {
SHUTDOWN_TIMEOUT
}

/**
* Used from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor} to signal itself
* to delete an already created policy because of a failure in creating a thing
* @param initialCommand the initial command that triggered the creation of a thing and policy
* @param response the response from the thing persistence actor
* @param responseFuture a future that when completed with the response from the thing persistence actor the response
* will be sent to the initial sender.
*/
private record RollbackCreatedPolicy(Signal<?> initialCommand, Object response, CompletableFuture<Object> responseFuture) {

public static RollbackCreatedPolicy of(final Signal<?> command, final Object response,
final CompletableFuture<Object> responseFuture) {
return new RollbackCreatedPolicy(command, response, responseFuture);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param response the response from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}.
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Signal<?> command, @Nullable final Object response) {
return shouldRollback(command, response, null);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param throwable the throwable received from the Persistence Actor
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Object command, @Nullable final Throwable throwable) {
if (command instanceof Signal<?> signal) {
return shouldRollback(signal, null, throwable);
} else {
return false;
}
}
private static boolean shouldRollback(final Signal<?> command, @Nullable final Object response, @Nullable final Throwable throwable) {
return command instanceof CreateThing && (response instanceof DittoRuntimeException || throwable != null);
}

/**
* Completes the responseFuture with the response which in turn should send the Persistence actor response to
* the initial sender.
*/
void completeInitialResponse() {
if (response instanceof Throwable t) {
responseFuture.completeExceptionally(t);
} else {
responseFuture.complete(response);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThingResponse;
Expand Down Expand Up @@ -450,6 +452,8 @@ public void createThingWithExplicitPolicyNotAuthorizedBySelf() {

thingPersistenceActorProbe.expectMsgClass(CreateThing.class);
thingPersistenceActorProbe.reply(ThingNotModifiableException.newBuilder(thingId).build());
policiesShardRegionProbe.expectMsgClass(DeletePolicy.class);
policiesShardRegionProbe.reply(DeletePolicyResponse.of(policyId, DEFAULT_HEADERS));

// THEN: initial requester receives error
expectMsgClass(ThingNotModifiableException.class);
Expand Down
Loading

0 comments on commit 01737fb

Please sign in to comment.