From de6e500f08c49f90edfea7d407d52d49efadfd06 Mon Sep 17 00:00:00 2001 From: sanjeev-thatiparthi Date: Sat, 29 May 2021 12:48:08 +0530 Subject: [PATCH] feat(pipeline executions/orca) : Manual Judgement Navigation Enhancement Backend API Implementation. Enhanced ManualJudgmentStage.groovy to To check if there are any parent executions waiting for manual judgment stage. If yes, fetch all the parent executions and set the waiting manual judgment stage's(this) execution and application name as leafnodeExecutionId and leafnodeApplicationName in parent's execution context others attribute and save it to the underlying storage. Once the user enters the manual judgment input(Continue/Stop), it deletes the leafnodeExecutionId and leafnodeApplicationName from all the parent executions. If no, continue the execution as usual. Enhanced RedisExecutionRepository.java to Save the leafnode attributes to the redis storage. Enhanced SqlExecutionRepository.java to Save the leafnode attributes to the sql storage. --- .../api/pipeline/models/StageExecution.java | 5 ++ .../pipeline/model/StageExecutionImpl.java | 10 +++ .../persistence/DualExecutionRepository.kt | 8 ++ .../persistence/ExecutionRepository.java | 4 + .../InMemoryExecutionRepository.kt | 8 ++ .../echo/pipeline/ManualJudgmentStage.groovy | 85 ++++++++++++++++++- .../pipeline/ManualJudgmentStageSpec.groovy | 12 +-- .../jedis/RedisExecutionRepository.java | 43 ++++++++++ .../persistence/SqlExecutionRepository.kt | 10 +++ 9 files changed, 178 insertions(+), 7 deletions(-) diff --git a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java index b67ffa6352..5919343bb4 100644 --- a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java +++ b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java @@ -86,6 +86,11 @@ public interface StageExecution { void setContext(@Nonnull Map context); + @Nonnull + Map getOthers(); + + void setOthers(@Nonnull Map others); + /** TODO(rz): getOutputs(Class)? */ @Nonnull Map getOutputs(); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java index 7a1c9d377f..19d528055c 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java @@ -248,6 +248,16 @@ public void setOutputs(@Nonnull Map outputs) { this.outputs = outputs; } + private Map others = new HashMap<>(); + + public @Nonnull Map getOthers() { + return others; + } + + public void setOthers(@Nonnull Map others) { + this.others = others; + } + /** * Returns the tasks that are associated with this stage. Tasks are the most granular unit of work * in a stage. Because tasks can be dynamically composed, this list is open updated during a diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index b6362e4929..6bbec8a221 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -119,6 +119,14 @@ class DualExecutionRepository( select(stage.execution).updateStageContext(stage) } + override fun updateStageOthers(stage: StageExecution) { + select(stage.execution).updateStageOthers(stage) + } + + override fun deleteStageOthers(stage: StageExecution) { + select(stage.execution).deleteStageOthers(stage) + } + override fun removeStage(execution: PipelineExecution, stageId: String) { select(execution).removeStage(execution, stageId) } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..2968c12b6a 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -35,6 +35,10 @@ public interface ExecutionRepository { void updateStageContext(@Nonnull StageExecution stage); + void updateStageOthers(@Nonnull StageExecution stage); + + void deleteStageOthers(@Nonnull StageExecution stage); + void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId); void addStage(@Nonnull StageExecution stage); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 4395c9f2bf..7799e34861 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -183,6 +183,14 @@ class InMemoryExecutionRepository : ExecutionRepository { // Do nothing } + override fun updateStageOthers(stage: StageExecution) { + // Do nothing + } + + override fun deleteStageOthers(stage: StageExecution) { + // Do nothing + } + override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(PIPELINE, correlationId) } diff --git a/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy b/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy index 5f61847868..14f3b29190 100644 --- a/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy +++ b/orca-echo/src/main/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStage.groovy @@ -25,7 +25,9 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.api.pipeline.TaskResult import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization -import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import org.springframework.beans.factory.annotation.Value import javax.annotation.Nonnull import java.util.concurrent.TimeUnit @@ -73,14 +75,20 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage final long backoffPeriod = 15000 final long timeout = TimeUnit.DAYS.toMillis(3) + @Value('${spinnaker.manual-judgment-navigation:false}') + boolean manualJudgmentNavigation + private final EchoService echoService private final ManualJudgmentAuthorization manualJudgmentAuthorization + private final ExecutionRepository executionRepository @Autowired WaitForManualJudgmentTask(Optional echoService, - ManualJudgmentAuthorization manualJudgmentAuthorization) { + ManualJudgmentAuthorization manualJudgmentAuthorization, + ExecutionRepository executionRepository) { this.echoService = echoService.orElse(null) this.manualJudgmentAuthorization = manualJudgmentAuthorization + this.executionRepository = executionRepository } @Override @@ -89,14 +97,24 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage String notificationState ExecutionStatus executionStatus + if (manualJudgmentNavigation) { + checkForAnyParentExecutions(stage) + } + switch (stageData.state) { case StageData.State.CONTINUE: notificationState = "manualJudgmentContinue" executionStatus = ExecutionStatus.SUCCEEDED + if (manualJudgmentNavigation) { + deleteLeafnodeAttributesFromTheParentExecutions(stage) + } break case StageData.State.STOP: notificationState = "manualJudgmentStop" executionStatus = ExecutionStatus.TERMINAL + if (manualJudgmentNavigation) { + deleteLeafnodeAttributesFromTheParentExecutions(stage) + } break default: notificationState = "manualJudgment" @@ -120,6 +138,69 @@ class ManualJudgmentStage implements StageDefinitionBuilder, AuthenticatedStage return TaskResult.builder(executionStatus).context(outputs).build() } + /** + * This method checks if this manual judgment stage is triggered by any other pipeline(parent execution). + * If yes, it fetches all the parent executions, which triggered this stage and sets the current + * running stage(manual judgment stage execution id and application name) to leafnode execution id and + * application name. + * + * p1 --> p2 --> p3 --> p4 (running manual judgment stage & waiting for judgment) + * + * p1 leafnodeExecutionId : p4 execution id + * p1 leafnodeApplicationName : p4 application name + * + * p2 leafnodeExecutionId : p4 execution id + * p2 leafnodeApplicationName : p4 application name + * + * p3 leafnodeExecutionId : p4 execution id + * p3 leafnodeApplicationName : p4 application name + * + * @param stage + */ + void checkForAnyParentExecutions(StageExecution stage) { + + def status = stage?.execution?.status + def trigger = stage?.execution?.trigger + def appName = stage?.execution?.application + def executionId = stage?.execution?.id + def stageId = stage?.execution?.id + while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) { + PipelineExecution parentExecution = trigger?.parentExecution + parentExecution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id) + parentExecution.getStages().each { + if (("pipeline").equals(it.getType()) && (ExecutionStatus.RUNNING.equals(it.getStatus()))) { + if (it.context && stageId.equals(it.context.executionId)) { + def others = [leafnodePipelineExecutionId: executionId, leafnodeApplicationName: appName] + it.setOthers(others) + stageId = it.execution.getId() + executionRepository.updateStageOthers(it) + } + } + } + trigger = parentExecution?.trigger + } + } + + /** + * This method deletes the leafnode attributes from all the parent stage executions. + * @param stage + */ + void deleteLeafnodeAttributesFromTheParentExecutions(StageExecution stage) { + + def status = stage?.execution?.status + def trigger = stage?.execution?.trigger + while (ExecutionStatus.RUNNING.equals(status) && trigger && trigger.hasProperty("parentExecution")) { + PipelineExecution parentExecution = trigger?.parentExecution + PipelineExecution execution = executionRepository.retrieve(ExecutionType.PIPELINE, parentExecution.id) + execution.getStages().each { + if (ExecutionStatus.RUNNING.equals(it.getStatus())) { + executionRepository.deleteStageOthers(it) + } + } + trigger = parentExecution?.trigger + } + } + Map processNotifications(StageExecution stage, StageData stageData, String notificationState) { if (echoService) { // sendNotifications will be true if using the new scheme for configuration notifications. diff --git a/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy b/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy index a64325789e..665240498f 100644 --- a/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy +++ b/orca-echo/src/test/groovy/com/netflix/spinnaker/orca/echo/pipeline/ManualJudgmentStageSpec.groovy @@ -26,6 +26,7 @@ import com.netflix.spinnaker.orca.echo.EchoService import com.netflix.spinnaker.orca.echo.util.ManualJudgmentAuthorization import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import spock.lang.Specification import spock.lang.Unroll import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.Notification @@ -33,6 +34,7 @@ import static com.netflix.spinnaker.orca.echo.pipeline.ManualJudgmentStage.WaitF class ManualJudgmentStageSpec extends Specification { EchoService echoService = Mock(EchoService) + ExecutionRepository executionRepository = Mock(ExecutionRepository) FiatPermissionEvaluator fiatPermissionEvaluator = Mock(FiatPermissionEvaluator) @@ -48,7 +50,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "should return execution status based on judgmentStatus"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context)) @@ -73,7 +75,7 @@ class ManualJudgmentStageSpec extends Specification { new UserPermission().addResources([new Role('foo')]).setAdmin(isAdmin).view } - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def stage = new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", context) @@ -97,7 +99,7 @@ class ManualJudgmentStageSpec extends Specification { void "should only send notifications for supported types"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [notifications: [ @@ -118,7 +120,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "if deprecated notification configuration is in use, only send notifications for awaiting judgment state"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) when: def result = task.execute(new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", [ @@ -199,7 +201,7 @@ class ManualJudgmentStageSpec extends Specification { @Unroll void "should retain unknown fields in the notification context"() { given: - def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization) + def task = new WaitForManualJudgmentTask(Optional.of(echoService), manualJudgmentAuthorization, executionRepository) def slackNotification = new Notification(type: "slack") slackNotification.setOther("customMessage", "hello slack") diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 815f335e24..fe099db328 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -159,6 +159,44 @@ public void updateStageContext(@Nonnull StageExecution stage) { }); } + @Override + public void updateStageOthers(@Nonnull StageExecution stage) { + RedisClientDelegate delegate = getRedisDelegate(stage); + String key = executionKey(stage); + String contextKey = format("stage.%s.others", stage.getId()); + delegate.withCommandsClient( + c -> { + try { + c.hset(key, contextKey, mapper.writeValueAsString(stage.getOthers())); + } catch (JsonProcessingException e) { + throw new StageSerializationException( + format( + "Failed serializing stage, executionId: %s, stageId: %s", + stage.getExecution().getId(), stage.getId()), + e); + } + }); + } + + @Override + public void deleteStageOthers(@Nonnull StageExecution stage) { + RedisClientDelegate delegate = getRedisDelegate(stage); + String key = executionKey(stage); + String contextKey = format("stage.%s.others", stage.getId()); + delegate.withCommandsClient( + c -> { + try { + c.hdel(key, contextKey, mapper.writeValueAsString(stage.getOthers())); + } catch (JsonProcessingException e) { + throw new StageSerializationException( + format( + "Failed serializing stage, executionId: %s, stageId: %s", + stage.getExecution().getId(), stage.getId()), + e); + } + }); + } + @Override public void removeStage(@Nonnull PipelineExecution execution, @Nonnull String stageId) { RedisClientDelegate delegate = getRedisDelegate(execution); @@ -936,6 +974,11 @@ protected PipelineExecution buildExecution( } else { stage.setOutputs(emptyMap()); } + if (map.get(prefix + "others") != null) { + stage.setOthers(mapper.readValue(map.get(prefix + "others"), MAP_STRING_TO_OBJECT)); + } else { + stage.setOthers(emptyMap()); + } if (map.get(prefix + "tasks") != null) { stage.setTasks(mapper.readValue(map.get(prefix + "tasks"), LIST_OF_TASKS)); } else { diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 07dc3e99a1..69a6c55ea5 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -146,6 +146,16 @@ class SqlExecutionRepository( storeStage(stage) } + override fun updateStageOthers(stage: StageExecution) { + storeStage(stage) + } + + override fun deleteStageOthers(stage: StageExecution) { + val others = mapOf() + stage.others = others; + storeStage(stage) + } + override fun removeStage(execution: PipelineExecution, stageId: String) { validateHandledPartitionOrThrow(execution)