Skip to content

Commit

Permalink
Merge pull request #1581 from bosch-io/feature/atomic-thing-create
Browse files Browse the repository at this point in the history
Creation of thing's policy is atomic with creation the of thing itself
  • Loading branch information
thjaeckle authored Mar 7, 2023
2 parents 750b255 + 594ac98 commit 6269376
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
return CompletableFuture.completedStage(persistenceCommandResponse);
}


/**
* Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor.
*
* @param enforcedCommand the enforced initial command
* @param throwable the throwable
* @return a new {@link java.util.concurrent.CompletionStage} failed with the initial throwable.
*/
protected CompletionStage<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
return CompletableFuture.failedFuture(throwable);
}

/**
* Return a preferably static supervisor strategy for this actor. By default, child actor is stopped when killed
* or failing, triggering restart after exponential back-off.
Expand Down Expand Up @@ -816,8 +828,9 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
"forwarding to target actor, telling sender: {}", dre);
sender.tell(dre, getSelf());
} else if (response instanceof Status.Success success) {
log.debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
log.withCorrelationId(signal).debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
} else if (null != response) {
log.withCorrelationId(signal).debug("Sending response: <{}> back to sender: <{}>", response, sender.path());
sender.tell(response, getSelf());
} else {
log.withCorrelationId(signal)
Expand Down Expand Up @@ -891,6 +904,10 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s
dittoHeaders = tracedSignal.getDittoHeaders();
}
return enforcerResponseToTargetActor(dittoHeaders, enforcedCommand, sender)
.exceptionallyCompose(error -> handleTargetActorException(enforcedCommand, error)
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null)))
// HandleTargetActorException will always return failed future thenApply only
// for compilation
.whenComplete((result, error) -> {
startedSpan.mark("processed");
stopTimer(processingTimer).accept(result, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final
final CreateThing createThing) {

if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) {
getContext().getParent().tell(new ThingPolicyCreated(createThing.getEntityId(),
createPolicyResponse.getEntityId(), createPolicy.getDittoHeaders()), getSelf());
return createPolicyResponse.getPolicyCreated().orElseThrow();
} else {
if (shouldReportInitialPolicyCreationFailure(policyResponse)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;


import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;

/**
* Used by the {@link org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor} to notify the
* ThingSupervisorActor that a policy was created in result of ThingCreate enforcement.
* @param thingId thingId of the thing for which policy is created
* @param policyId the policyId of the created policy
* @param dittoHeaders dittoHeaders containing the correlationId of the initial command
*/
public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {}
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,31 @@
*/
package org.eclipse.ditto.things.service.persistence.actors;


import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -42,17 +49,20 @@
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
Expand Down Expand Up @@ -94,6 +104,8 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final ActorRef searchShardRegionProxy;

private final Duration shutdownTimeout;
@Nullable
private ThingPolicyCreated policyCreatedEvent;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand Down Expand Up @@ -307,6 +319,10 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
pair.response() instanceof RetrieveThingResponse retrieveThingResponse) {
return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse)
.map(Object.class::cast);
} else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) {
final CompletableFuture<Object> responseF = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf());
return Source.completionStage(responseF);
} else {
return Source.single(pair.response());
}
Expand All @@ -315,6 +331,46 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
.run(materializer);
}

@Override
protected CompletableFuture<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
if (RollbackCreatedPolicy.shouldRollback(enforcedCommand, throwable) &&
enforcedCommand instanceof Signal<?> signal) {
log.withCorrelationId(signal)
.warning("Target actor exception received. Sending RollbackCreatedPolicy msg to self.");
final CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf());
return responseFuture;
} else {
log.warning(throwable, "Target actor exception received.");
return CompletableFuture.failedFuture(throwable);
}
}

private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
if (policyCreatedEvent != null) {
final String correlationId = rollback.initialCommand().getDittoHeaders().getCorrelationId()
.orElse("unexpected:" + UUID.randomUUID());
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.correlationId(correlationId)
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders);
AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy,
enforcementConfig.getAskWithRetryConfig(),
getContext().system(), response -> {
log.withCorrelationId(dittoHeaders)
.info("Policy <{}> deleted after rolling back it's creation. " +
"Policies shard region response: <{}>", deletePolicy.getEntityId(), response);
rollback.completeInitialResponse();
return response;
});

} else {
rollback.completeInitialResponse();
}
policyCreatedEvent = null;
}

@Override
protected ThingId getEntityId() throws Exception {
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -377,6 +433,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor)
.match(ThingPolicyCreated.class, msg -> {
log.withCorrelationId(msg.dittoHeaders()).info("ThingPolicyCreated msg received: <{}>", msg.policyId());
this.policyCreatedEvent = msg;
}).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}
Expand All @@ -393,4 +453,66 @@ private enum Control {
SHUTDOWN_TIMEOUT
}

/**
* Used from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor} to signal itself
* to delete an already created policy because of a failure in creating a thing
* @param initialCommand the initial command that triggered the creation of a thing and policy
* @param response the response from the thing persistence actor
* @param responseFuture a future that when completed with the response from the thing persistence actor the response
* will be sent to the initial sender.
*/
private record RollbackCreatedPolicy(Signal<?> initialCommand, Object response, CompletableFuture<Object> responseFuture) {

/**
* Initialises an instance of {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor.RollbackCreatedPolicy}
* @param initialCommand the initial initialCommand that triggered the creation of a thing and policy
* @param response the response from the thing persistence actor
* @param responseFuture a future that when completed with the response from the thing persistence actor the response
* will be sent to the initial sender.
* @return an instance of {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor.RollbackCreatedPolicy}
*/
public static RollbackCreatedPolicy of(final Signal<?> initialCommand, final Object response,
final CompletableFuture<Object> responseFuture) {
return new RollbackCreatedPolicy(initialCommand, response, responseFuture);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param response the response from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}.
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Signal<?> command, @Nullable final Object response) {
return shouldRollback(command, response, null);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param throwable the throwable received from the Persistence Actor
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Object command, @Nullable final Throwable throwable) {
if (command instanceof Signal<?> signal) {
return shouldRollback(signal, null, throwable);
} else {
return false;
}
}
private static boolean shouldRollback(final Signal<?> command, @Nullable final Object response, @Nullable final Throwable throwable) {
return command instanceof CreateThing && (response instanceof DittoRuntimeException || throwable != null);
}

/**
* Completes the responseFuture with the response which in turn should send the Persistence actor response to
* the initial sender.
*/
void completeInitialResponse() {
if (response instanceof Throwable t) {
responseFuture.completeExceptionally(t);
} else {
responseFuture.complete(response);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThingResponse;
Expand Down Expand Up @@ -450,6 +452,8 @@ public void createThingWithExplicitPolicyNotAuthorizedBySelf() {

thingPersistenceActorProbe.expectMsgClass(CreateThing.class);
thingPersistenceActorProbe.reply(ThingNotModifiableException.newBuilder(thingId).build());
policiesShardRegionProbe.expectMsgClass(DeletePolicy.class);
policiesShardRegionProbe.reply(DeletePolicyResponse.of(policyId, DEFAULT_HEADERS));

// THEN: initial requester receives error
expectMsgClass(ThingNotModifiableException.class);
Expand Down
Loading

0 comments on commit 6269376

Please sign in to comment.