Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation of thing's policy is atomic with creation the of thing itself #1581

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
return CompletableFuture.completedStage(persistenceCommandResponse);
}


/**
* Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor.
*
* @param enforcedCommand the enforced initial command
* @param throwable the throwable
* @return a new {@link java.util.concurrent.CompletionStage} failed with the initial throwable.
*/
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
protected CompletionStage<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
return CompletableFuture.failedFuture(throwable);
}

/**
* Return a preferably static supervisor strategy for this actor. By default, child actor is stopped when killed
* or failing, triggering restart after exponential back-off.
Expand Down Expand Up @@ -816,8 +828,9 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
"forwarding to target actor, telling sender: {}", dre);
sender.tell(dre, getSelf());
} else if (response instanceof Status.Success success) {
log.debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
log.withCorrelationId(signal).debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
} else if (null != response) {
log.withCorrelationId(signal).debug("Sending response: <{}> back to sender: <{}>", response, sender.path());
sender.tell(response, getSelf());
} else {
log.withCorrelationId(signal)
Expand Down Expand Up @@ -891,6 +904,10 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s
dittoHeaders = tracedSignal.getDittoHeaders();
}
return enforcerResponseToTargetActor(dittoHeaders, enforcedCommand, sender)
.exceptionallyCompose(error -> handleTargetActorException(enforcedCommand, error)
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null)))
// HandleTargetActorException will always return failed future thenApply only
// for compilation
.whenComplete((result, error) -> {
startedSpan.mark("processed");
stopTimer(processingTimer).accept(result, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final
final CreateThing createThing) {

if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) {
getContext().getParent().tell(new ThingPolicyCreated(createThing.getEntityId(),
createPolicyResponse.getEntityId(), createPolicy.getDittoHeaders()), getSelf());
return createPolicyResponse.getPolicyCreated().orElseThrow();
} else {
if (shouldReportInitialPolicyCreationFailure(policyResponse)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;


import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;

/**
* Used by the {@link org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor} to notify the
* ThingSupervisorActor that a policy was created in result of ThingCreate enforcement.
* @param thingId thingId of the thing for which policy is created
* @param policyId the policyId of the created policy
* @param dittoHeaders dittoHeaders containing the correlationId of the initial command
*/
public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {}
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,31 @@
*/
package org.eclipse.ditto.things.service.persistence.actors;


import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -42,17 +49,20 @@
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
Expand Down Expand Up @@ -94,6 +104,8 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final ActorRef searchShardRegionProxy;

private final Duration shutdownTimeout;
@Nullable
private ThingPolicyCreated policyCreatedEvent;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand Down Expand Up @@ -307,6 +319,10 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
pair.response() instanceof RetrieveThingResponse retrieveThingResponse) {
return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse)
.map(Object.class::cast);
} else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) {
final CompletableFuture<Object> responseF = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf());
return Source.completionStage(responseF);
} else {
return Source.single(pair.response());
}
Expand All @@ -315,6 +331,46 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
.run(materializer);
}

@Override
protected CompletableFuture<Object> handleTargetActorException(final Object enforcedCommand, final Throwable throwable) {
if (RollbackCreatedPolicy.shouldRollback(enforcedCommand, throwable) &&
enforcedCommand instanceof Signal<?> signal) {
log.withCorrelationId(signal)
.warning("Target actor exception received. Sending RollbackCreatedPolicy msg to self.");
final CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf());
return responseFuture;
} else {
log.warning(throwable, "Target actor exception received.");
return CompletableFuture.failedFuture(throwable);
}
}

private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
if (policyCreatedEvent != null) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
final String correlationId = rollback.initialCommand().getDittoHeaders().getCorrelationId()
.orElse("unexpected:" + UUID.randomUUID());
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.correlationId(correlationId)
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders);
AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy,
enforcementConfig.getAskWithRetryConfig(),
getContext().system(), response -> {
log.withCorrelationId(dittoHeaders)
.info("Policy <{}> deleted after rolling back it's creation. " +
"Policies shard region response: <{}>", deletePolicy.getEntityId(), response);
rollback.completeInitialResponse();
return response;
});

} else {
rollback.completeInitialResponse();
}
policyCreatedEvent = null;
}

@Override
protected ThingId getEntityId() throws Exception {
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -377,6 +433,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor)
.match(ThingPolicyCreated.class, msg -> {
log.withCorrelationId(msg.dittoHeaders()).info("ThingPolicyCreated msg received: <{}>", msg.policyId());
this.policyCreatedEvent = msg;
}).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}
Expand All @@ -393,4 +453,66 @@ private enum Control {
SHUTDOWN_TIMEOUT
}

/**
* Used from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor} to signal itself
* to delete an already created policy because of a failure in creating a thing
* @param initialCommand the initial command that triggered the creation of a thing and policy
* @param response the response from the thing persistence actor
* @param responseFuture a future that when completed with the response from the thing persistence actor the response
* will be sent to the initial sender.
*/
private record RollbackCreatedPolicy(Signal<?> initialCommand, Object response, CompletableFuture<Object> responseFuture) {

/**
* Initialises an instance of {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor.RollbackCreatedPolicy}
* @param initialCommand the initial initialCommand that triggered the creation of a thing and policy
* @param response the response from the thing persistence actor
* @param responseFuture a future that when completed with the response from the thing persistence actor the response
* will be sent to the initial sender.
* @return an instance of {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor.RollbackCreatedPolicy}
*/
public static RollbackCreatedPolicy of(final Signal<?> initialCommand, final Object response,
final CompletableFuture<Object> responseFuture) {
return new RollbackCreatedPolicy(initialCommand, response, responseFuture);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param response the response from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}.
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Signal<?> command, @Nullable final Object response) {
return shouldRollback(command, response, null);
}

/**
* Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy.
* @param command the initial command.
* @param throwable the throwable received from the Persistence Actor
* @return if the thing's policy is to be deleted.
*/
public static boolean shouldRollback(final Object command, @Nullable final Throwable throwable) {
if (command instanceof Signal<?> signal) {
return shouldRollback(signal, null, throwable);
} else {
return false;
}
}
private static boolean shouldRollback(final Signal<?> command, @Nullable final Object response, @Nullable final Throwable throwable) {
return command instanceof CreateThing && (response instanceof DittoRuntimeException || throwable != null);
}

/**
* Completes the responseFuture with the response which in turn should send the Persistence actor response to
* the initial sender.
*/
void completeInitialResponse() {
if (response instanceof Throwable t) {
responseFuture.completeExceptionally(t);
} else {
responseFuture.complete(response);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThingResponse;
Expand Down Expand Up @@ -450,6 +452,8 @@ public void createThingWithExplicitPolicyNotAuthorizedBySelf() {

thingPersistenceActorProbe.expectMsgClass(CreateThing.class);
thingPersistenceActorProbe.reply(ThingNotModifiableException.newBuilder(thingId).build());
policiesShardRegionProbe.expectMsgClass(DeletePolicy.class);
policiesShardRegionProbe.reply(DeletePolicyResponse.of(policyId, DEFAULT_HEADERS));

// THEN: initial requester receives error
expectMsgClass(ThingNotModifiableException.class);
Expand Down
Loading