From 01737fba4b27157151616d5931997e036e2d39e6 Mon Sep 17 00:00:00 2001 From: Stanchev Aleksandar Date: Mon, 6 Mar 2023 17:24:32 +0200 Subject: [PATCH] Added unit tests and fixed things pointed out in the code review. Signed-off-by: Stanchev Aleksandar --- .../AbstractPersistenceSupervisor.java | 26 +- .../enforcement/RollbackCreatedPolicy.java | 43 ---- .../enforcement/ThingEnforcerActor.java | 5 +- .../enforcement/ThingPolicyCreated.java | 16 +- .../actors/ThingSupervisorActor.java | 98 ++++++-- .../MultiStageCommandEnforcementTest.java | 4 + .../actors/ThingPersistenceActorTest.java | 231 ++++++++++++++++++ 7 files changed, 339 insertions(+), 84 deletions(-) delete mode 100644 things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/RollbackCreatedPolicy.java diff --git a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java index c82cf237a71..e330df7d4c3 100644 --- a/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java +++ b/internal/utils/persistent-actors/src/main/java/org/eclipse/ditto/internal/utils/persistentactors/AbstractPersistenceSupervisor.java @@ -351,12 +351,12 @@ protected CompletionStage modifyTargetActorCommandResponse(final Signal< /** * Hook for handling unexpected PersistenceActor exceptions before response is sent back to the SupervisorActor. * - * @param error the error - * @param enforcedSignal the ditto headers from the initial command + * @param enforcedCommand the enforced initial command + * @param throwable the throwable + * @return a new {@link java.util.concurrent.CompletionStage} failed with the initial throwable. */ - protected CompletionStage handleTargetActorException(final Throwable error, - final Signal enforcedSignal) { - return CompletableFuture.failedFuture(error); + protected CompletionStage handleTargetActorException(final Object enforcedCommand, final Throwable throwable) { + return CompletableFuture.failedFuture(throwable); } /** @@ -828,8 +828,9 @@ private void handleSignalEnforcementResponse(@Nullable final Object response, "forwarding to target actor, telling sender: {}", dre); sender.tell(dre, getSelf()); } else if (response instanceof Status.Success success) { - log.debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success); + log.withCorrelationId(signal).debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success); } else if (null != response) { + log.withCorrelationId(signal).debug("Sending response: <{}> back to sender: <{}>", response, sender.path()); sender.tell(response, getSelf()); } else { log.withCorrelationId(signal) @@ -903,6 +904,10 @@ protected CompletionStage enforceSignalAndForwardToTargetActor(final S s dittoHeaders = tracedSignal.getDittoHeaders(); } return enforcerResponseToTargetActor(dittoHeaders, enforcedCommand, sender) + .exceptionallyCompose(error -> handleTargetActorException(enforcedCommand, error) + .thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null))) + // HandleTargetActorException will always return failed future thenApply only + // for compilation .whenComplete((result, error) -> { startedSpan.mark("processed"); stopTimer(processingTimer).accept(result, error); @@ -980,14 +985,7 @@ private CompletionStage enforcerResponseTo modifyTargetActorCommandResponse(enforcedSignal, response)) .thenApply(response -> new EnforcedSignalAndTargetActorResponse(enforcedSignal, response) - ).exceptionallyCompose(error -> { - log.withCorrelationId(enforcedSignal) - .error(error, "Unexpected target actor response error!"); - return handleTargetActorException(error, enforcedSignal) - // Would never gon in apply as handleTargetActorException will always return failed future - // Added only for compilation - .thenApply(o -> new EnforcedSignalAndTargetActorResponse(null, null)); - }); + ); } else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) { return askTargetActor(distributedPubWithMessage, distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/RollbackCreatedPolicy.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/RollbackCreatedPolicy.java deleted file mode 100644 index ace2e01ba88..00000000000 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/RollbackCreatedPolicy.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2023 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ - -package org.eclipse.ditto.things.service.enforcement; - -import java.util.concurrent.CompletableFuture; - -import org.eclipse.ditto.base.model.common.ResponseType; -import org.eclipse.ditto.base.model.signals.Signal; -import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing; -import org.eclipse.ditto.things.model.signals.commands.modify.CreateThingResponse; - - -public record RollbackCreatedPolicy(Signal initialCommand, Object response, CompletableFuture responseFuture) { - - public static RollbackCreatedPolicy of(final Signal command, final Object response, - final CompletableFuture responseFuture) { - return new RollbackCreatedPolicy(command, response, responseFuture); - } - - public static boolean shouldRollback(final Signal command, final Object response) { - return command instanceof CreateThing && response instanceof CreateThingResponse thingResponse && - thingResponse.getResponseType() == ResponseType.ERROR; - } - - public void completeInitialResponse() { - if (response instanceof Throwable t) { - responseFuture.completeExceptionally(t); - } else { - responseFuture.complete(response); - } - } -} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java index 67beaaa4a47..230b1f7232d 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingEnforcerActor.java @@ -361,9 +361,8 @@ private Policy handleCreatePolicyResponse(final CreatePolicy createPolicy, final final CreateThing createThing) { if (policyResponse instanceof CreatePolicyResponse createPolicyResponse) { - getContext().getParent().tell( - ThingPolicyCreated.of(createThing.getEntityId(), createPolicyResponse.getEntityId(), - createPolicyResponse.getDittoHeaders()), getSelf()); + getContext().getParent().tell(new ThingPolicyCreated(createThing.getEntityId(), + createPolicyResponse.getEntityId(), createPolicy.getDittoHeaders()), getSelf()); return createPolicyResponse.getPolicyCreated().orElseThrow(); } else { if (shouldReportInitialPolicyCreationFailure(policyResponse)) { diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingPolicyCreated.java b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingPolicyCreated.java index 035195c942a..b4aad9af16c 100644 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingPolicyCreated.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/enforcement/ThingPolicyCreated.java @@ -18,11 +18,11 @@ import org.eclipse.ditto.policies.model.PolicyId; import org.eclipse.ditto.things.model.ThingId; - -public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) { - - public static ThingPolicyCreated of(final ThingId thingId, final PolicyId policyId, final DittoHeaders dittoHeaders) { - return new ThingPolicyCreated(thingId, policyId, dittoHeaders); - } - -} +/** + * Used by the {@link org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor} to notify the + * ThingSupervisorActor that a policy was created in result of ThingCreate enforcement. + * @param thingId thingId of the thing for which policy is created + * @param policyId the policyId of the created policy + * @param dittoHeaders dittoHeaders containing the correlationId of the initial command + */ + public record ThingPolicyCreated(ThingId thingId, PolicyId policyId, DittoHeaders dittoHeaders) {} diff --git a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java index b12601c2064..d3d14f48afa 100755 --- a/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java +++ b/things/service/src/main/java/org/eclipse/ditto/things/service/persistence/actors/ThingSupervisorActor.java @@ -12,10 +12,12 @@ */ package org.eclipse.ditto.things.service.persistence.actors; + import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -23,6 +25,7 @@ import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; import org.eclipse.ditto.base.model.auth.AuthorizationContext; +import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; @@ -60,7 +63,6 @@ import org.eclipse.ditto.things.service.enforcement.ThingEnforcement; import org.eclipse.ditto.things.service.enforcement.ThingEnforcerActor; import org.eclipse.ditto.things.service.enforcement.ThingPolicyCreated; -import org.eclipse.ditto.things.service.enforcement.RollbackCreatedPolicy; import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants; import akka.actor.ActorKilledException; @@ -318,9 +320,9 @@ protected CompletionStage modifyTargetActorCommandResponse(final Signal< return inlinePolicyEnrichment.enrichPolicy(retrieveThing, retrieveThingResponse) .map(Object.class::cast); } else if (RollbackCreatedPolicy.shouldRollback(pair.command(), pair.response())) { - CompletableFuture responseF = new CompletableFuture<>(); + final CompletableFuture responseF = new CompletableFuture<>(); getSelf().tell(RollbackCreatedPolicy.of(pair.command(), pair.response(), responseF), getSelf()); - return Source.fromCompletionStage(responseF); + return Source.completionStage(responseF); } else { return Source.single(pair.response()); } @@ -330,20 +332,29 @@ protected CompletionStage modifyTargetActorCommandResponse(final Signal< } @Override - protected CompletableFuture handleTargetActorException(final Throwable error, final Signal enforcedSignal) { - if (enforcedSignal instanceof CreateThing createThing) { - CompletableFuture responseFuture = new CompletableFuture<>(); - getSelf().tell(RollbackCreatedPolicy.of(createThing, error, responseFuture), getSelf()); + protected CompletableFuture handleTargetActorException(final Object enforcedCommand, final Throwable throwable) { + if (RollbackCreatedPolicy.shouldRollback(enforcedCommand, throwable) && + enforcedCommand instanceof Signal signal) { + log.withCorrelationId(signal) + .warning("Target actor exception received. Sending RollbackCreatedPolicy msg to self."); + final CompletableFuture responseFuture = new CompletableFuture<>(); + getSelf().tell(RollbackCreatedPolicy.of(signal, throwable, responseFuture), getSelf()); return responseFuture; + } else { + log.warning(throwable, "Target actor exception received."); + return CompletableFuture.failedFuture(throwable); } - return CompletableFuture.failedFuture(error); } - private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) { + private void handleRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) { if (policyCreatedEvent != null) { - DittoHeaders dittoHeaders = rollback.initialCommand().getDittoHeaders(); - final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders.toBuilder() - .putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true").build()); + final String correlationId = rollback.initialCommand().getDittoHeaders().getCorrelationId() + .orElse("unexpected:" + UUID.randomUUID()); + final DittoHeaders dittoHeaders = DittoHeaders.newBuilder() + .correlationId(correlationId) + .putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true") + .build(); + final DeletePolicy deletePolicy = DeletePolicy.of(policyCreatedEvent.policyId(), dittoHeaders); AskWithRetry.askWithRetry(policiesShardRegion, deletePolicy, enforcementConfig.getAskWithRetryConfig(), getContext().system(), response -> { @@ -357,6 +368,7 @@ private void handlerRollbackCreatedPolicy(final RollbackCreatedPolicy rollback) } else { rollback.completeInitialResponse(); } + policyCreatedEvent = null; } @Override @@ -421,10 +433,10 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha final FI.UnitApply matchAnyBehavior) { return ReceiveBuilder.create() .matchEquals(Control.SHUTDOWN_TIMEOUT, this::shutdownActor) - .match(ThingPolicyCreated.class, event -> { - this.policyCreatedEvent = event; - log.withCorrelationId(event.dittoHeaders()).info("Policy <{}> created", event.policyId()); - }).match(RollbackCreatedPolicy.class, this::handlerRollbackCreatedPolicy) + .match(ThingPolicyCreated.class, msg -> { + log.withCorrelationId(msg.dittoHeaders()).info("ThingPolicyCreated msg received: <{}>", msg.policyId()); + this.policyCreatedEvent = msg; + }).match(RollbackCreatedPolicy.class, this::handleRollbackCreatedPolicy) .build() .orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior)); } @@ -441,4 +453,58 @@ private enum Control { SHUTDOWN_TIMEOUT } + /** + * Used from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingSupervisorActor} to signal itself + * to delete an already created policy because of a failure in creating a thing + * @param initialCommand the initial command that triggered the creation of a thing and policy + * @param response the response from the thing persistence actor + * @param responseFuture a future that when completed with the response from the thing persistence actor the response + * will be sent to the initial sender. + */ + private record RollbackCreatedPolicy(Signal initialCommand, Object response, CompletableFuture responseFuture) { + + public static RollbackCreatedPolicy of(final Signal command, final Object response, + final CompletableFuture responseFuture) { + return new RollbackCreatedPolicy(command, response, responseFuture); + } + + /** + * Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy. + * @param command the initial command. + * @param response the response from the {@link org.eclipse.ditto.things.service.persistence.actors.ThingPersistenceActor}. + * @return if the thing's policy is to be deleted. + */ + public static boolean shouldRollback(final Signal command, @Nullable final Object response) { + return shouldRollback(command, response, null); + } + + /** + * Evaluates if a failure in the creation of a thing should lead to deleting of that thing's policy. + * @param command the initial command. + * @param throwable the throwable received from the Persistence Actor + * @return if the thing's policy is to be deleted. + */ + public static boolean shouldRollback(final Object command, @Nullable final Throwable throwable) { + if (command instanceof Signal signal) { + return shouldRollback(signal, null, throwable); + } else { + return false; + } + } + private static boolean shouldRollback(final Signal command, @Nullable final Object response, @Nullable final Throwable throwable) { + return command instanceof CreateThing && (response instanceof DittoRuntimeException || throwable != null); + } + + /** + * Completes the responseFuture with the response which in turn should send the Persistence actor response to + * the initial sender. + */ + void completeInitialResponse() { + if (response instanceof Throwable t) { + responseFuture.completeExceptionally(t); + } else { + responseFuture.complete(response); + } + } + } } diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/enforcement/MultiStageCommandEnforcementTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/enforcement/MultiStageCommandEnforcementTest.java index 44f3a4686f9..df7facea575 100644 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/enforcement/MultiStageCommandEnforcementTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/enforcement/MultiStageCommandEnforcementTest.java @@ -47,6 +47,8 @@ import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException; import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy; import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse; +import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy; +import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse; import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy; import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse; import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThingResponse; @@ -450,6 +452,8 @@ public void createThingWithExplicitPolicyNotAuthorizedBySelf() { thingPersistenceActorProbe.expectMsgClass(CreateThing.class); thingPersistenceActorProbe.reply(ThingNotModifiableException.newBuilder(thingId).build()); + policiesShardRegionProbe.expectMsgClass(DeletePolicy.class); + policiesShardRegionProbe.reply(DeletePolicyResponse.of(policyId, DEFAULT_HEADERS)); // THEN: initial requester receives error expectMsgClass(ThingNotModifiableException.class); diff --git a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java index e87263621fb..d338ad604fa 100755 --- a/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java +++ b/things/service/src/test/java/org/eclipse/ditto/things/service/persistence/actors/ThingPersistenceActorTest.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -45,6 +46,9 @@ import org.eclipse.ditto.base.model.signals.events.Event; import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess; import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.internal.utils.pubsub.DistributedPub; +import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor; +import org.eclipse.ditto.internal.utils.pubsubthings.LiveSignalPub; import org.eclipse.ditto.internal.utils.test.Retry; import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource; import org.eclipse.ditto.json.JsonFactory; @@ -63,6 +67,8 @@ import org.eclipse.ditto.policies.model.SubjectIssuer; import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy; import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse; +import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicy; +import org.eclipse.ditto.policies.model.signals.commands.modify.DeletePolicyResponse; import org.eclipse.ditto.things.api.Permission; import org.eclipse.ditto.things.model.Attributes; import org.eclipse.ditto.things.model.Feature; @@ -112,6 +118,7 @@ import org.eclipse.ditto.things.model.signals.events.ThingCreated; import org.eclipse.ditto.things.model.signals.events.ThingEvent; import org.eclipse.ditto.things.model.signals.events.ThingModified; +import org.eclipse.ditto.things.service.enforcement.TestSetup; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -124,12 +131,15 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Props; import akka.cluster.pubsub.DistributedPubSubMediator; +import akka.japi.pf.ReceiveBuilder; import akka.testkit.TestActorRef; +import akka.testkit.TestProbe; import akka.testkit.javadsl.TestKit; import scala.PartialFunction; import scala.concurrent.Await; @@ -2004,6 +2014,227 @@ public void testRemovalOfEmptyMetadataAfterDeletion() { }}; } + @Test + public void unavailableExpectedAndPolicyIsDeletedIfPersistenceActorFails() { + final DittoHeaders dittoHeaders = dittoHeadersV2.toBuilder() + .correlationId(UUID.randomUUID().toString()) + .build(); + final Policy inlinePolicy = PoliciesModelFactory.newPolicyBuilder(POLICY_ID) + .setRevision(1L) + .forLabel("authorize-self") + .setSubject(SubjectIssuer.newInstance("test"), AUTH_SUBJECT) + .setGrantedPermissions(PoliciesResourceType.thingResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.policyResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.messageResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .build(); + final CreatePolicyResponse createPolicyResponse = CreatePolicyResponse.of(POLICY_ID, inlinePolicy, + DittoHeaders.empty()); + when(policyEnforcerProvider.getPolicyEnforcer(POLICY_ID)) + .thenReturn(CompletableFuture.completedStage(Optional.of(PolicyEnforcer.of(inlinePolicy)))); + + final DeletePolicyResponse deletePolicyResponse = DeletePolicyResponse.of(POLICY_ID, dittoHeaders); + + new TestKit(actorSystem) { + { + Thing thing = createThingV2WithRandomId().toBuilder().setPolicyId(null).build(); + ThingId thingId = getIdOrThrow(thing); + + ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + + CreateThing createThing = CreateThing.of(thing, null, dittoHeaders); + underTest.tell(createThing, getRef()); + policiesShardRegionTestProbe.expectMsgClass(CreatePolicy.class); + policiesShardRegionTestProbe.reply(createPolicyResponse); + policiesShardRegionTestProbe.expectMsgClass(DeletePolicy.class); + policiesShardRegionTestProbe.reply(deletePolicyResponse); + expectMsgClass(ThingUnavailableException.class); + expectNoMessage(); + + } + }; + } + + @Test + public void policyShouldNotBeDeletedOnThingRetrieveAndActorFail() { + final DittoHeaders dittoHeaders = dittoHeadersV2.toBuilder() + .correlationId(UUID.randomUUID().toString()) + .build(); + final Policy inlinePolicy = PoliciesModelFactory.newPolicyBuilder(POLICY_ID) + .setRevision(1L) + .forLabel("authorize-self") + .setSubject(SubjectIssuer.newInstance("test"), AUTH_SUBJECT) + .setGrantedPermissions(PoliciesResourceType.thingResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.policyResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.messageResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .build(); + when(policyEnforcerProvider.getPolicyEnforcer(POLICY_ID)) + .thenReturn(CompletableFuture.completedStage(Optional.of(PolicyEnforcer.of(inlinePolicy)))); + + new TestKit(actorSystem) { + { + Thing thing = createThingV2WithRandomId().toBuilder().setPolicyId(null).build(); + ThingId thingId = getIdOrThrow(thing); + + ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, + (thingId1, mongoReadJournal, distributedPub, searchShardRegionProxy) -> FailingInCtorActor.props()); + + RetrieveThing retrieveThing = RetrieveThing.of(thingId, dittoHeaders); + underTest.tell(retrieveThing, getRef()); + policiesShardRegionTestProbe.expectNoMessage(); + expectMsgClass(ThingUnavailableException.class); + expectNoMessage(); + + } + }; + } + + @Test + public void policyShouldBeDeletedOnThingCreateAndErrorResponse() { + final DittoHeaders dittoHeaders = dittoHeadersV2.toBuilder() + .correlationId(UUID.randomUUID().toString()) + .build(); + final Policy inlinePolicy = PoliciesModelFactory.newPolicyBuilder(POLICY_ID) + .setRevision(1L) + .forLabel("authorize-self") + .setSubject(SubjectIssuer.newInstance("test"), AUTH_SUBJECT) + .setGrantedPermissions(PoliciesResourceType.thingResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.policyResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .setGrantedPermissions(PoliciesResourceType.messageResource(JsonPointer.empty()), + Permissions.newInstance(Permission.READ, Permission.WRITE)) + .build(); + final CreatePolicyResponse createPolicyResponse = CreatePolicyResponse.of(POLICY_ID, inlinePolicy, + DittoHeaders.empty()); + when(policyEnforcerProvider.getPolicyEnforcer(POLICY_ID)) + .thenReturn(CompletableFuture.completedStage(Optional.of(PolicyEnforcer.of(inlinePolicy)))); + + final DeletePolicyResponse deletePolicyResponse = DeletePolicyResponse.of(POLICY_ID, dittoHeaders); + + new TestKit(actorSystem) { + { + Thing thing = createThingV2WithRandomId().toBuilder().setPolicyId(null).build(); + ThingId thingId = getIdOrThrow(thing); + + TestProbe testProbe = TestProbe.apply("mock-thingPersistenceActorProbe", actorSystem); + ActorRef underTest = createSupervisorActorWithCustomPersistenceActor(thingId, testProbe.ref()); + + CreateThing createThing = CreateThing.of(thing, null, dittoHeaders); + underTest.tell(createThing, getRef()); + + testProbe.expectNoMsg(); + + + policiesShardRegionTestProbe.expectMsgClass(CreatePolicy.class); + policiesShardRegionTestProbe.reply(createPolicyResponse); + + testProbe.expectMsgClass(CreateThing.class); + testProbe.reply(ThingUnavailableException.newBuilder(thingId) + .dittoHeaders(dittoHeaders) + .message("Error in target persistent actor").build()); + + policiesShardRegionTestProbe.expectMsgClass(DeletePolicy.class); + policiesShardRegionTestProbe.reply(deletePolicyResponse); + expectMsgClass(ThingUnavailableException.class); + expectNoMessage(); + + } + }; + } + + public static final class FailingInCtorActor extends AbstractActor { + + public FailingInCtorActor() { + super(); + throw new RuntimeException("Failed to create PersistenceActor"); + } + + @Override + public Receive createReceive() { + return ReceiveBuilder.create().build(); + } + + private static Props props() { + return Props.create(FailingInCtorActor.class); + } + } + + private ActorRef createSupervisorActorWithCustomPersistenceActor(final ThingId thingId, + final ThingPersistenceActorPropsFactory persistenceActorPropsFactory) { + final LiveSignalPub liveSignalPub = new TestSetup.DummyLiveSignalPub(pubSubMediator); + final Props props = + ThingSupervisorActor.props(pubSubMediator, + policiesShardRegion, + new DistributedPub<>() { + + @Override + public ActorRef getPublisher() { + return pubSubMediator; + } + + @Override + public Object wrapForPublication(final ThingEvent message, + final CharSequence groupIndexKey) { + return message; + } + + @Override + public > Object wrapForPublicationWithAcks(final S message, + final CharSequence groupIndexKey, final AckExtractor ackExtractor) { + return wrapForPublication(message, groupIndexKey); + } + }, + liveSignalPub, + persistenceActorPropsFactory, + null, + policyEnforcerProvider, + Mockito.mock(MongoReadJournal.class)); + + return actorSystem.actorOf(props, thingId.toString()); + } + + private ActorRef createSupervisorActorWithCustomPersistenceActor(final ThingId thingId, + final ActorRef persistenceActor) { + final LiveSignalPub liveSignalPub = new TestSetup.DummyLiveSignalPub(pubSubMediator); + final Props props = + ThingSupervisorActor.props(pubSubMediator, + policiesShardRegion, + new DistributedPub<>() { + + @Override + public ActorRef getPublisher() { + return pubSubMediator; + } + + @Override + public Object wrapForPublication(final ThingEvent message, + final CharSequence groupIndexKey) { + return message; + } + + @Override + public > Object wrapForPublicationWithAcks(final S message, + final CharSequence groupIndexKey, final AckExtractor ackExtractor) { + return wrapForPublication(message, groupIndexKey); + } + }, + liveSignalPub, + persistenceActor, + null, + policyEnforcerProvider, + Mockito.mock(MongoReadJournal.class)); + + return actorSystem.actorOf(props, thingId.toString()); + } + + private void assertPublishEvent(final ThingEvent event) { final ThingEvent msg = pubSubTestProbe.expectMsgClass(ThingEvent.class); Assertions.assertThat(msg.toJson())