Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation of thing's policy is atomic with creation the of thing itself #1581

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
return CompletableFuture.completedStage(persistenceCommandResponse);
}


/**
* Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor.
*
* @param error the error
* @param enforcedSignal the ditto headers from the initial command
*/
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
protected CompletionStage<Object> handleTargetActorException(final Throwable error,
final Signal<?> enforcedSignal) {
return CompletableFuture.failedFuture(error);
}

/**
* Return a preferably static supervisor strategy for this actor. By default, child actor is stopped when killed
* or failing, triggering restart after exponential back-off.
Expand Down Expand Up @@ -968,7 +980,14 @@ private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseTo
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
);
).exceptionallyCompose(error -> {
log.withCorrelationId(enforcedSignal)
.error(error, "Unexpected target actor response error!");
return handleTargetActorException(error, enforcedSignal)
// Would never gon in apply as handleTargetActorException will always return failed future
// Added only for compilation
.thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null));
});
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;

import java.util.concurrent.CompletableFuture;

import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThingResponse;


public record RollbackCreatedPolicy(Signal<?> initialCommand, Object response, CompletableFuture<Object> responseFuture) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved

public static RollbackCreatedPolicy of(final Signal<?> command, final Object response,
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
final CompletableFuture<Object> responseFuture) {
return new RollbackCreatedPolicy(command, response, responseFuture);
}

public static boolean shouldRollback(final Signal<?> command, final Object response) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
return command instanceof CreateThing && response instanceof CreateThingResponse thingResponse &&
thingResponse.getResponseType() == ResponseType.ERROR;
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
}

public void completeInitialResponse() {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
if (response instanceof Throwable t) {
responseFuture.completeExceptionally(t);
} else {
responseFuture.complete(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final
final CreateThing createThing) {

if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) {
getContext().getParent().tell(
ThingPolicyCreated.of(createThing.getEntityId(), createPolicyResponse.getEntityId(),
createPolicyResponse.getDittoHeaders()), getSelf());
return createPolicyResponse.getPolicyCreated().orElseThrow();
} else {
if (shouldReportInitialPolicyCreationFailure(policyResponse)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.ditto.things.service.enforcement;


import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;


public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved

public static ThingPolicyCreated of(final ThingId thingId, final PolicyId policyId, final DittoHeaders dittoHeaders) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
return new ThingPolicyCreated(thingId, policyId, dittoHeaders);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cluster.ShardRegionProxyActorFactory;
import org.eclipse.ditto.internal.utils.cluster.StopShardedActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
Expand All @@ -42,17 +46,21 @@
import org.eclipse.ditto.policies.api.PoliciesMessagingConstants;
import org.eclipse.ditto.policies.enforcement.PolicyEnforcerProvider;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.things.service.common.config.DittoThingsConfig;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcement;
import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor;
import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated;
import org.eclipse.ditto.things.service.enforcement.RollbackCreatedPolicy;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;

import akka.actor.ActorKilledException;
Expand Down Expand Up @@ -94,6 +102,8 @@ public final class ThingSupervisorActor extends AbstractPersistenceSupervisor<Th
private final ActorRef searchShardRegionProxy;

private final Duration shutdownTimeout;
@Nullable
private ThingPolicyCreated policyCreatedEvent;

@SuppressWarnings("unused")
private ThingSupervisorActor(final ActorRef pubSubMediator,
Expand Down Expand Up @@ -307,6 +317,10 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
pair.response() instanceof RetrieveThingResponse retrieveThingResponse) {
return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse)
.map(Object.class::cast);
} else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) {
CompletableFuture<Object> responseF = new CompletableFuture<>();
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf());
return Source.fromCompletionStage(responseF);
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Source.single(pair.response());
}
Expand All @@ -315,6 +329,36 @@ protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<
.run(materializer);
}

@Override
protected CompletableFuture<Object> handleTargetActorException(final Throwable error, final Signal<?> enforcedSignal) {
if (enforcedSignal instanceof CreateThing createThing) {
CompletableFuture<Object> responseFuture = new CompletableFuture<>();
getSelf().tell(RollbackCreatedPolicy.of(createThing, error, responseFuture), getSelf());
return responseFuture;
}
return CompletableFuture.failedFuture(error);
}

private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) {
if (policyCreatedEvent != null) {
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
DittoHeaders dittoHeaders = rollback.initialCommand().getDittoHeaders();
final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders.toBuilder()
alstanchev marked this conversation as resolved.
Show resolved Hide resolved
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true").build());
AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy,
enforcementConfig.getAskWithRetryConfig(),
getContext().system(), response -> {
log.withCorrelationId(dittoHeaders)
.info("Policy <{}> deleted after rolling back it's creation. " +
"Policies shard region response: <{}>", deletePolicy.getEntityId(), response);
rollback.completeInitialResponse();
return response;
});

} else {
rollback.completeInitialResponse();
}
}

@Override
protected ThingId getEntityId() throws Exception {
return ThingId.of(URLDecoder.decode(getSelf().path().name(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -377,6 +421,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
final FI.UnitApply<Object> matchAnyBehavior) {
return ReceiveBuilder.create()
.matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor)
.match(ThingPolicyCreated.class, event -> {
this.policyCreatedEvent = event;
log.withCorrelationId(event.dittoHeaders()).info("Policy <{}> created", event.policyId());
}).match(RollbackCreatedPolicy.class, this::handlerRollbackCreatedPolicy)
.build()
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}
Expand Down