Skip to content

Commit

Permalink
Create of thing's policy is atomic with creation the of thing itself
Browse files Browse the repository at this point in the history
Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>

Signed-off-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
  • Loading branch information
Stanchev Aleksandar committed Feb 22, 2023
1 parent 7f2e58b commit 77b1c03
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
return CompletableFuture.completedStage(persistenceCommandResponse);
}


/**
* Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor.
*
* @param error the error
* @param enforcedSignal the ditto headers from the initial command
*/
protected CompletionStage<Object> handleTargetActorException(final Throwable error,
final Signal<?> enforcedSignal) {
return CompletableFuture.failedFuture(error);
}

/**
* Return a preferably static supervisor strategy for this actor. By default, child actor is stopped when killed
* or failing, triggering restart after exponential back-off.
Expand Down Expand Up @@ -968,7 +980,14 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
);
).exceptionallyCompose(error -> {
log.withCorrelationId(enforcedSignal)
.error(error, "Unexpected target actor response error!");
return handleTargetActorException(error, enforcedSignal)
// Would never gon in apply as handleTargetActorException will always return failed future
// Added only for compilation
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null));
});
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;

import java.util.concurrent.CompletableFuture;

import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThingResponse;


public record RollbackCreatedPolicy(Signal<?> initialCommand, Object response, CompletableFuture<Object> responseFuture) {

public static RollbackCreatedPolicy of(final Signal<?> command, final Object response,
final CompletableFuture<Object> responseFuture) {
return new RollbackCreatedPolicy(command, response, responseFuture);
}

public static boolean shouldRollback(final Signal<?> command, final Object response) {
return command instanceof CreateThing && response instanceof CreateThingResponse thingResponse &&
thingResponse.getResponseType() == ResponseType.ERROR;
}

public void completeInitialResponse() {
if (response instanceof Throwable t) {
responseFuture.completeExceptionally(t);
} else {
responseFuture.complete(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final
final CreateThing createThing) {

if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) {
getContext().getParent().tell(
ThingPolicyCreated.of(createThing.getEntityId(), createPolicyResponse.getEntityId(),
createPolicyResponse.getDittoHeaders()), getSelf());
return createPolicyResponse.getPolicyCreated().orElseThrow();
} else {
if (shouldReportInitialPolicyCreationFailure(policyResponse)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;


import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;


public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {

public static ThingPolicyCreated of(final ThingId thingId, final PolicyId policyId, final DittoHeaders dittoHeaders) {
return new ThingPolicyCreated(thingId, policyId, dittoHeaders);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -42,17 +46,21 @@
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.things.service.enforcement.RollbackCreatedPolicy;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
Expand Down Expand Up @@ -94,6 +102,8 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final ActorRef searchShardRegionProxy;

private final Duration shutdownTimeout;
@Nullable
private ThingPolicyCreated policyCreatedEvent;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand Down Expand Up @@ -307,6 +317,10 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
pair.response() instanceof RetrieveThingResponse retrieveThingResponse) {
return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse)
.map(Object.class::cast);
} else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) {
CompletableFuture<Object> responseF = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf());
return Source.fromCompletionStage(responseF);
} else {
return Source.single(pair.response());
}
Expand All @@ -315,6 +329,36 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
.run(materializer);
}

@Override
protected CompletableFuture<Object> handleTargetActorException(final Throwable error, final Signal<?> enforcedSignal) {
if (enforcedSignal instanceof CreateThing createThing) {
CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(createThing, error, responseFuture), getSelf());
return responseFuture;
}
return CompletableFuture.failedFuture(error);
}

private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
if (policyCreatedEvent != null) {
DittoHeaders dittoHeaders = rollback.initialCommand().getDittoHeaders();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders.toBuilder()
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true").build());
AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy,
enforcementConfig.getAskWithRetryConfig(),
getContext().system(), response -> {
log.withCorrelationId(dittoHeaders)
.info("Policy <{}> deleted after rolling back it's creation. " +
"Policies shard region response: <{}>", deletePolicy.getEntityId(), response);
rollback.completeInitialResponse();
return response;
});

} else {
rollback.completeInitialResponse();
}
}

@Override
protected ThingId getEntityId() throws Exception {
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -377,6 +421,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor)
.match(ThingPolicyCreated.class, event -> {
this.policyCreatedEvent = event;
log.withCorrelationId(event.dittoHeaders()).info("Policy <{}> created", event.policyId());
}).match(RollbackCreatedPolicy.class, this::handlerRollbackCreatedPolicy)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}
Expand Down

0 comments on commit 77b1c03

Please sign in to comment.