From 40ecffd102fe7f057ec3e626d020f1b596ad88fb Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Mon, 22 Mar 2021 16:07:51 -0400 Subject: [PATCH 1/9] initial commit --- .../pipeline/CompoundExecutionOperator.java | 14 + .../orca/pipeline/ExecutionRunner.java | 4 + .../persistence/ExecutionRepository.java | 2 + .../IgnoreStageFailureInterlinkEvent.java | 61 ++++ .../orca/interlink/events/InterlinkEvent.java | 6 +- .../spinnaker/orca/q/QueueExecutionRunner.kt | 4 + .../q/handler/IgnoreStageFailureHandler.kt | 83 +++++ .../com/netflix/spinnaker/orca/q/messages.kt | 15 + .../handler/IgnoreStageFailureHandlerTest.kt | 317 ++++++++++++++++++ .../persistence/SqlExecutionRepository.kt | 14 +- .../orca/controllers/TaskController.groovy | 13 + 11 files changed, 524 insertions(+), 9 deletions(-) create mode 100644 orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java create mode 100644 orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt create mode 100644 orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java index 335f9c4303..3f7ad2817f 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java @@ -119,6 +119,20 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri return execution; } + public PipelineExecution ignoreStageFailure( + @Nonnull String executionId, @Nonnull String stageId) { + PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId); + if (repository.handlesPartition(execution.getPartition())) { + runner.ignoreFailure(execution, stageId); + } else { + log.info( + "Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'", + execution.getPartition()); + repository.ignoreStageFailure(executionId, stageId); + } + return execution; + } + private PipelineExecution doInternal( Consumer runnerAction, Runnable repositoryAction, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java index b6b2a7a76c..9fbcedd82d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java @@ -27,6 +27,10 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage throw new UnsupportedOperationException(); } + default void ignoreFailure(@Nonnull PipelineExecution execution, @Nonnull String stageId) { + throw new UnsupportedOperationException(); + } + default void reschedule(@Nonnull PipelineExecution execution) { throw new UnsupportedOperationException(); } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..976410d0be 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -186,6 +186,8 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) { // foreign executions default void restartStage(String executionId, String stageId) {} + default void ignoreStageFailure(String executionId, String stageId) {} + final class ExecutionCriteria { private int pageSize = 3500; private Collection statuses = new ArrayList<>(); diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java new file mode 100644 index 0000000000..117e96804a --- /dev/null +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.interlink.events; + +import static com.netflix.spinnaker.orca.interlink.events.InterlinkEvent.EventType.IGNORE_FAILURE; + +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; +import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * This event is published on the interlink as a result of a user IGNORING THE FAILURE of a stage on + * an orca instance that can't handle the partition for the given execution. + * + *

The event is then handled by an orca instance (listening on interlink) whose partition matches + * that of the execution. The resulting repository mutations of this event will then be peered by + * the PeeringAgent + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent { + final EventType eventType = IGNORE_FAILURE; + @Nullable String partition; + @Nonnull ExecutionType executionType; + @Nonnull String executionId; + @Nonnull String stageId; + + public IgnoreStageFailureInterlinkEvent( + @Nonnull ExecutionType executionType, @Nonnull String executionId, @Nonnull String stageId) { + // for the moment, only ExecutionType.PIPELINE can have ignored stages + // but since we are defining the protocol on the wire here, let's be a bit future proof and + // accept potentially different execution types + this.executionType = executionType; + this.executionId = executionId; + this.stageId = stageId; + } + + @Override + public void applyTo(CompoundExecutionOperator executionOperator) { + executionOperator.ignoreStageFailure(executionId, stageId); + } +} diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java index 8469422002..0ce62a71e5 100644 --- a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/InterlinkEvent.java @@ -44,7 +44,8 @@ @JsonSubTypes.Type(value = ResumeInterlinkEvent.class, name = "RESUME"), @JsonSubTypes.Type(value = DeleteInterlinkEvent.class, name = "DELETE"), @JsonSubTypes.Type(value = PatchStageInterlinkEvent.class, name = "PATCH"), - @JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART") + @JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART"), + @JsonSubTypes.Type(value = IgnoreStageFailureInterlinkEvent.class, name = "IGNORE_FAILURE") }) public interface InterlinkEvent { enum EventType { @@ -53,7 +54,8 @@ enum EventType { DELETE, RESUME, PATCH, - RESTART + RESTART, + IGNORE_FAILURE } @JsonIgnore diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt index c53c176f36..f451649e3f 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt @@ -38,6 +38,10 @@ class QueueExecutionRunner( queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) } + override fun ignoreFailure(execution: PipelineExecution, stageId: String) { + queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) + } + override fun unpause(execution: PipelineExecution) { queue.push(ResumeExecution(execution)) } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt new file mode 100644 index 0000000000..f16d4d629d --- /dev/null +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2017 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License") + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.* +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.IgnoreStageFailure +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Queue +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component +import java.time.Clock + +@Component +class IgnoreStageFailureHandler( + override val queue: Queue, + override val repository: ExecutionRepository, + override val stageDefinitionBuilderFactory: StageDefinitionBuilderFactory, + private val pendingExecutionService: PendingExecutionService, + private val clock: Clock +) : OrcaMessageHandler, StageBuilderAware { + + override val messageType = IgnoreStageFailure::class.java + + private val log: Logger get() = LoggerFactory.getLogger(javaClass) + + override fun handle(message: IgnoreStageFailure) { + message.withStage { stage -> + if (!stage.status.isHalt) { + log.warn("Attempting to ignore the failure of stage $stage which is not halted. Will ignore") + } else if (stage.execution.shouldQueue()) { + // this pipeline is already running and has limitConcurrent = true + stage.execution.pipelineConfigId?.let { + log.info("Queueing IgnoreStageFailure of {} {} {}", stage.execution.application, stage.execution.name, stage.execution.id) + pendingExecutionService.enqueue(it, message) + } + } else { + stage.status = FAILED_CONTINUE + stage.addIgnoreFailureDetails(message.user) + repository.storeStage(stage) + + val topLevelStage = stage.topLevelStage + if (topLevelStage != stage) { + topLevelStage.status = RUNNING + repository.storeStage(topLevelStage) + } + + val execution = topLevelStage.execution + if (execution.status.isHalt) { + stage.execution.updateStatus(RUNNING) + repository.updateStatus(execution) + } + + stage.startNext() + } + } + } + + private fun StageExecution.addIgnoreFailureDetails(user: String?) { + context["ignoreFailureDetails"] = mapOf( + "failureIgnoredBy" to (user ?: "anonymous"), + "failureIgnoreTime" to clock.millis(), + "previousException" to context.remove("exception") + ) + } +} diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt index 3100a1ac76..96a1086018 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt @@ -250,6 +250,21 @@ data class RestartStage( this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user) } +@JsonTypeName("ignoreStageFailure") +data class IgnoreStageFailure( + override val executionType: ExecutionType, + override val executionId: String, + override val application: String, + override val stageId: String, + val user: String? +) : Message(), StageLevel { + constructor(source: PipelineExecution, stageId: String, user: String?) : + this(source.type, source.id, source.application, stageId, user) + + constructor(stage: StageExecution, user: String?) : + this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user) +} + @JsonTypeName("resumeStage") data class ResumeStage( override val executionType: ExecutionType, diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt new file mode 100644 index 0000000000..a94f7b849d --- /dev/null +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt @@ -0,0 +1,317 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.q.handler + +import com.netflix.spinnaker.orca.DefaultStageResolver +import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.NOT_STARTED +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.TERMINAL +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.FAILED_CONTINUE +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE +import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution +import com.netflix.spinnaker.orca.api.test.pipeline +import com.netflix.spinnaker.orca.api.test.stage +import com.netflix.spinnaker.orca.pipeline.DefaultStageDefinitionBuilderFactory +import com.netflix.spinnaker.orca.pipeline.model.StageExecutionImpl +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.* +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.time.fixedClock +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.argForWhich +import com.nhaarman.mockito_kotlin.argumentCaptor +import com.nhaarman.mockito_kotlin.atLeast +import com.nhaarman.mockito_kotlin.check +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.reset +import com.nhaarman.mockito_kotlin.times +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import java.time.temporal.ChronoUnit.HOURS +import java.time.temporal.ChronoUnit.MINUTES +import kotlin.collections.contains +import kotlin.collections.filter +import kotlin.collections.first +import kotlin.collections.flatMap +import kotlin.collections.forEach +import kotlin.collections.listOf +import kotlin.collections.map +import kotlin.collections.mapOf +import kotlin.collections.set +import kotlin.collections.setOf +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.lifecycle.CachingMode.GROUP +import org.jetbrains.spek.subject.SubjectSpek + +object IgnoreStageFailureHandlerTest : SubjectSpek({ + val queue: Queue = mock() + val repository: ExecutionRepository = mock() + val pendingExecutionService: PendingExecutionService = mock() + val clock = fixedClock() + + subject(GROUP) { + IgnoreStageFailureHandler( + queue, + repository, + DefaultStageDefinitionBuilderFactory( + DefaultStageResolver( + StageDefinitionBuildersProvider( + listOf( + singleTaskStage, + stageWithSyntheticBefore, + stageWithNestedSynthetics + ) + ) + ) + ), + pendingExecutionService, + clock + ) + } + + fun resetMocks() = reset(queue, repository) + + ExecutionStatus + .values() + .filter { !it.isHalt } + .forEach { unhaltedStatus -> + describe("trying to ignore the failure of a $unhaltedStatus stage") { + val pipeline = pipeline { + application = "foo" + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = unhaltedStatus + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + + beforeGroup { + whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("does not modify the stage status") { + verify(repository, never()).store(any()) + } + + it("does not run any stages") { + verify(queue, never()).push(any()) + } + } + } + + describe("ignoring the failure of a failed stage with no downstream stages") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("recommences the pipeline") { + verify(repository).updateStatus( + check { + assertThat(it.id).isEqualTo(pipeline.id) + assertThat(it.status).isEqualTo(RUNNING) + } + ) + } + + it("attempts to start the next downstream stage, but finds none, so sends a CompleteExecution message") { + verify(queue).push(any()) + } + } + + describe("ignoring the failure of a failed stage with downstream stages") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "3" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "4" + requisiteStageRefIds = listOf("2") + singleTaskStage.plan(this) + status = NOT_STARTED + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("recommences the pipeline") { + verify(repository).updateStatus( + check { + assertThat(it.id).isEqualTo(pipeline.id) + assertThat(it.status).isEqualTo(RUNNING) + } + ) + } + + it("sends a StartStage message for all downstream stages") { + argumentCaptor().apply { + verify(queue, times(2)).push(capture()) + assertThat(allValues.map { it.stageId }.toSet()).isEqualTo(pipeline.stages.filter{"1" in it.requisiteStageRefIds}.map { it.id }.toSet()) + } + } + } + + describe("ignoring the failure of a failed in a running pipeline") { + val pipeline = pipeline { + application = "foo" + id = "1234" + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(30, MINUTES).toEpochMilli() + stage { + refId = "1" + singleTaskStage.plan(this) + status = TERMINAL + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + endTime = clock.instant().minus(59, MINUTES).toEpochMilli() + } + stage { + refId = "2" + requisiteStageRefIds = listOf("1") + singleTaskStage.plan(this) + status = NOT_STARTED + } + stage { + refId = "3" + singleTaskStage.plan(this) + status = RUNNING + startTime = clock.instant().minus(1, HOURS).toEpochMilli() + } + } + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + + beforeGroup { + whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + subject.handle(message) + } + + it("changes the stage's status to FAILED_CONTINUE") { + verify(repository).storeStage( + check { + assertThat(it.id).isEqualTo(message.stageId) + assertThat(it.status).isEqualTo(FAILED_CONTINUE) + } + ) + } + + it("Does not change the status of the pipeline because it's already running") { + verify(repository, never()).updateStatus() + } + + it("sends a StartStage message for all downstream stages") { + verify(queue).push() + } + } +}) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 07dc3e99a1..ff3c1b703a 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -33,13 +33,7 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.api.pipeline.persistence.ExecutionRepositoryListener import com.netflix.spinnaker.orca.interlink.Interlink -import com.netflix.spinnaker.orca.interlink.events.CancelInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.DeleteInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.InterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.PatchStageInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.PauseInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.RestartStageInterlinkEvent -import com.netflix.spinnaker.orca.interlink.events.ResumeInterlinkEvent +import com.netflix.spinnaker.orca.interlink.events.* import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator @@ -260,6 +254,12 @@ class SqlExecutionRepository( } } + override fun ignoreStageFailure(executionId: String, stageId: String) { + doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId)) { + _, _ -> log.debug("ignoreStageFailure is a no-op for local executions") + } + } + override fun updateStatus(execution: PipelineExecution) { withPool(poolName) { jooq.transactional { diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index bf47e136df..200ca8b364 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -45,6 +45,7 @@ import org.springframework.web.bind.annotation.* import rx.schedulers.Schedulers import java.nio.charset.Charset +import java.nio.file.AccessDeniedException import java.time.Clock import java.time.ZoneOffset import java.util.concurrent.TimeUnit @@ -505,6 +506,18 @@ class TaskController { return executionOperator.restartStage(id, stageId) } + @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'EXECUTE')") + @RequestMapping(value = "/pipelines/{id}/stages/{stageId}/ignoreFailure", method = RequestMethod.PUT) + PipelineExecution ignoreFailureOfPipelineStage( + @PathVariable String id, @PathVariable String stageId) { + pipeline = executionRepository.retrieve(PIPELINE, id) + stage = pipeline.stageById(stageId) + if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) { + throw AccessDeniedException("Stage does not allow ignoreFailure action") + } + return executionOperator.ignoreStageFailure(id, stageId) + } + @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'READ')") @RequestMapping(value = "/pipelines/{id}/evaluateExpression", method = RequestMethod.GET) Map evaluateExpressionForExecution(@PathVariable("id") String id, From a1870d05d66c94cf266423f709f332d30079687c Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Mon, 22 Mar 2021 16:46:45 -0400 Subject: [PATCH 2/9] fix --- .../orca/q/migration/OrcaToKeikoSerializationMigrator.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt index fd73fbb098..25c8248e46 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt @@ -33,6 +33,7 @@ internal val orcaToKeikoTypes = mapOf( ".PauseStage" to "pauseStage", ".RestartStage" to "restartStage", ".ResumeStage" to "resumeStage", + ".IgnoreStageFailure" to "ignoreStageFailure" ".CancelStage" to "cancelStage", ".StartExecution" to "startExecution", ".RescheduleExecution" to "rescheduleExecution", From 17ccf311776d96b09c121c715bb8369a775b490f Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Mon, 22 Mar 2021 16:52:36 -0400 Subject: [PATCH 3/9] fix --- .../orca/q/migration/OrcaToKeikoSerializationMigrator.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt index 25c8248e46..a54ba469f6 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/migration/OrcaToKeikoSerializationMigrator.kt @@ -33,7 +33,7 @@ internal val orcaToKeikoTypes = mapOf( ".PauseStage" to "pauseStage", ".RestartStage" to "restartStage", ".ResumeStage" to "resumeStage", - ".IgnoreStageFailure" to "ignoreStageFailure" + ".IgnoreStageFailure" to "ignoreStageFailure", ".CancelStage" to "cancelStage", ".StartExecution" to "startExecution", ".RescheduleExecution" to "rescheduleExecution", From 2d0b6f69623e5f9c186ae2fb80a18ee1ccdbd1a1 Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Wed, 24 Mar 2021 21:13:56 -0400 Subject: [PATCH 4/9] fix failed tests --- .../spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt | 1 + .../spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt index f16d4d629d..458a92a49c 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt @@ -43,6 +43,7 @@ class IgnoreStageFailureHandler( override fun handle(message: IgnoreStageFailure) { message.withStage { stage -> + if (!stage.status.isHalt) { log.warn("Attempting to ignore the failure of stage $stage which is not halted. Will ignore") } else if (stage.execution.shouldQueue()) { diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt index a94f7b849d..26a00155a6 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt @@ -307,11 +307,11 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ } it("Does not change the status of the pipeline because it's already running") { - verify(repository, never()).updateStatus() + verify(repository, never()).updateStatus(any()) } it("sends a StartStage message for all downstream stages") { - verify(queue).push() + verify(queue).push(any()) } } }) From 4dc138ebc6c2c160ba8683ee212af3d4665e9884 Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Wed, 24 Mar 2021 23:48:08 -0400 Subject: [PATCH 5/9] fix exception --- .../netflix/spinnaker/orca/controllers/TaskController.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 200ca8b364..46a7e46d84 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -510,8 +510,8 @@ class TaskController { @RequestMapping(value = "/pipelines/{id}/stages/{stageId}/ignoreFailure", method = RequestMethod.PUT) PipelineExecution ignoreFailureOfPipelineStage( @PathVariable String id, @PathVariable String stageId) { - pipeline = executionRepository.retrieve(PIPELINE, id) - stage = pipeline.stageById(stageId) + def pipeline = executionRepository.retrieve(PIPELINE, id) + def stage = pipeline.stageById(stageId) if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) { throw AccessDeniedException("Stage does not allow ignoreFailure action") } From 4b9d07b64df3aa25f0240cfff1e95fea7cfed888 Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Thu, 25 Mar 2021 00:08:10 -0400 Subject: [PATCH 6/9] fix error handling --- .../netflix/spinnaker/orca/controllers/TaskController.groovy | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 46a7e46d84..7e67a044ea 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -45,7 +45,6 @@ import org.springframework.web.bind.annotation.* import rx.schedulers.Schedulers import java.nio.charset.Charset -import java.nio.file.AccessDeniedException import java.time.Clock import java.time.ZoneOffset import java.util.concurrent.TimeUnit @@ -513,7 +512,7 @@ class TaskController { def pipeline = executionRepository.retrieve(PIPELINE, id) def stage = pipeline.stageById(stageId) if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) { - throw AccessDeniedException("Stage does not allow ignoreFailure action") + throw new CannotUpdateExecutionStage("Stage does not allow ignoreFailure action") } return executionOperator.ignoreStageFailure(id, stageId) } From 4d05b2b30b3740f62469f270e52530131fa5a8ce Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Thu, 25 Mar 2021 16:09:30 -0400 Subject: [PATCH 7/9] working --- .../orca/pipeline/CompoundExecutionOperator.java | 6 +++--- .../spinnaker/orca/pipeline/ExecutionRunner.java | 3 ++- .../pipeline/persistence/ExecutionRepository.java | 2 +- .../events/IgnoreStageFailureInterlinkEvent.java | 9 +++++++-- .../netflix/spinnaker/orca/q/QueueExecutionRunner.kt | 4 ++-- .../orca/q/handler/IgnoreStageFailureHandler.kt | 9 +++++---- .../kotlin/com/netflix/spinnaker/orca/q/messages.kt | 11 ++++++----- .../orca/q/handler/IgnoreStageFailureHandlerTest.kt | 8 ++++---- .../pipeline/persistence/SqlExecutionRepository.kt | 4 ++-- .../spinnaker/orca/controllers/TaskController.groovy | 4 ++-- 10 files changed, 34 insertions(+), 26 deletions(-) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java index 3f7ad2817f..a3f2a53836 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/CompoundExecutionOperator.java @@ -120,15 +120,15 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri } public PipelineExecution ignoreStageFailure( - @Nonnull String executionId, @Nonnull String stageId) { + @Nonnull String executionId, @Nonnull String stageId, String reason) { PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId); if (repository.handlesPartition(execution.getPartition())) { - runner.ignoreFailure(execution, stageId); + runner.ignoreFailure(execution, stageId, reason); } else { log.info( "Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'", execution.getPartition()); - repository.ignoreStageFailure(executionId, stageId); + repository.ignoreStageFailure(executionId, stageId, reason); } return execution; } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java index 9fbcedd82d..55be5178c0 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/ExecutionRunner.java @@ -27,7 +27,8 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage throw new UnsupportedOperationException(); } - default void ignoreFailure(@Nonnull PipelineExecution execution, @Nonnull String stageId) { + default void ignoreFailure( + @Nonnull PipelineExecution execution, @Nonnull String stageId, String reason) { throw new UnsupportedOperationException(); } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 976410d0be..12592de813 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -186,7 +186,7 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) { // foreign executions default void restartStage(String executionId, String stageId) {} - default void ignoreStageFailure(String executionId, String stageId) {} + default void ignoreStageFailure(String executionId, String stageId, String reason) {} final class ExecutionCriteria { private int pageSize = 3500; diff --git a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java index 117e96804a..b36bd5b365 100644 --- a/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java +++ b/orca-interlink/src/main/java/com/netflix/spinnaker/orca/interlink/events/IgnoreStageFailureInterlinkEvent.java @@ -43,19 +43,24 @@ public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent { @Nonnull ExecutionType executionType; @Nonnull String executionId; @Nonnull String stageId; + String reason; public IgnoreStageFailureInterlinkEvent( - @Nonnull ExecutionType executionType, @Nonnull String executionId, @Nonnull String stageId) { + @Nonnull ExecutionType executionType, + @Nonnull String executionId, + @Nonnull String stageId, + String reason) { // for the moment, only ExecutionType.PIPELINE can have ignored stages // but since we are defining the protocol on the wire here, let's be a bit future proof and // accept potentially different execution types this.executionType = executionType; this.executionId = executionId; this.stageId = stageId; + this.reason = reason; } @Override public void applyTo(CompoundExecutionOperator executionOperator) { - executionOperator.ignoreStageFailure(executionId, stageId); + executionOperator.ignoreStageFailure(executionId, stageId, reason); } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt index f451649e3f..e1a8e69a55 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueExecutionRunner.kt @@ -38,8 +38,8 @@ class QueueExecutionRunner( queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) } - override fun ignoreFailure(execution: PipelineExecution, stageId: String) { - queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null))) + override fun ignoreFailure(execution: PipelineExecution, stageId: String, reason: String?) { + queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null), reason)) } override fun unpause(execution: PipelineExecution) { diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt index 458a92a49c..867509ebf1 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt @@ -54,7 +54,7 @@ class IgnoreStageFailureHandler( } } else { stage.status = FAILED_CONTINUE - stage.addIgnoreFailureDetails(message.user) + stage.addIgnoreFailureDetails(message.user, message.reason) repository.storeStage(stage) val topLevelStage = stage.topLevelStage @@ -74,10 +74,11 @@ class IgnoreStageFailureHandler( } } - private fun StageExecution.addIgnoreFailureDetails(user: String?) { + private fun StageExecution.addIgnoreFailureDetails(user: String?, reason: String?) { context["ignoreFailureDetails"] = mapOf( - "failureIgnoredBy" to (user ?: "anonymous"), - "failureIgnoreTime" to clock.millis(), + "by" to (user ?: "anonymous"), + "reason" to (reason ?: "unspecified"), + "time" to clock.millis(), "previousException" to context.remove("exception") ) } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt index 96a1086018..ae71303fe6 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/messages.kt @@ -256,13 +256,14 @@ data class IgnoreStageFailure( override val executionId: String, override val application: String, override val stageId: String, - val user: String? + val user: String?, + val reason: String? ) : Message(), StageLevel { - constructor(source: PipelineExecution, stageId: String, user: String?) : - this(source.type, source.id, source.application, stageId, user) + constructor(source: PipelineExecution, stageId: String, user: String?, reason: String?) : + this(source.type, source.id, source.application, stageId, user, reason) - constructor(stage: StageExecution, user: String?) : - this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user) + constructor(stage: StageExecution, user: String?, reason: String?) : + this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user, reason) } @JsonTypeName("resumeStage") diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt index 26a00155a6..37108947ec 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt @@ -115,7 +115,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ status = NOT_STARTED } } - val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) beforeGroup { whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline @@ -152,7 +152,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ endTime = clock.instant().minus(59, MINUTES).toEpochMilli() } } - val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) beforeGroup { whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline @@ -220,7 +220,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ status = NOT_STARTED } } - val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) beforeGroup { whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline @@ -285,7 +285,7 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ startTime = clock.instant().minus(1, HOURS).toEpochMilli() } } - val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com") + val message = IgnoreStageFailure(pipeline.type, pipeline.id, "foo", pipeline.stageByRef("1").id, "aalhamali@coveo.com", null) beforeGroup { whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index ff3c1b703a..36ab1380b2 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -254,8 +254,8 @@ class SqlExecutionRepository( } } - override fun ignoreStageFailure(executionId: String, stageId: String) { - doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId)) { + override fun ignoreStageFailure(executionId: String, stageId: String, reason: String?) { + doForeignAware(IgnoreStageFailureInterlinkEvent(PIPELINE, executionId, stageId, reason)) { _, _ -> log.debug("ignoreStageFailure is a no-op for local executions") } } diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 7e67a044ea..0bd712e0fa 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -508,13 +508,13 @@ class TaskController { @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'EXECUTE')") @RequestMapping(value = "/pipelines/{id}/stages/{stageId}/ignoreFailure", method = RequestMethod.PUT) PipelineExecution ignoreFailureOfPipelineStage( - @PathVariable String id, @PathVariable String stageId) { + @PathVariable String id, @PathVariable String stageId, @RequestBody Map details) { def pipeline = executionRepository.retrieve(PIPELINE, id) def stage = pipeline.stageById(stageId) if (!(boolean) stage.context.getCurrentOnly("allowIgnoreFailure", false)) { throw new CannotUpdateExecutionStage("Stage does not allow ignoreFailure action") } - return executionOperator.ignoreStageFailure(id, stageId) + return executionOperator.ignoreStageFailure(id, stageId, details["reason"]) } @PreAuthorize("hasPermission(this.getPipeline(#id)?.application, 'APPLICATION', 'READ')") From 6aaa495d2dbe74deb7eaa00c19281f1d9b58b8e5 Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Thu, 25 Mar 2021 16:22:53 -0400 Subject: [PATCH 8/9] change all pipelines into running --- .../spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt index 867509ebf1..a9823c183f 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandler.kt @@ -64,10 +64,8 @@ class IgnoreStageFailureHandler( } val execution = topLevelStage.execution - if (execution.status.isHalt) { - stage.execution.updateStatus(RUNNING) - repository.updateStatus(execution) - } + stage.execution.updateStatus(RUNNING) + repository.updateStatus(execution) stage.startNext() } From 00b65f23bbd25a6a103ba52796d359b72dafae29 Mon Sep 17 00:00:00 2001 From: AbdulRahman AlHamali Date: Thu, 25 Mar 2021 16:59:02 -0400 Subject: [PATCH 9/9] remove unneeded check --- .../spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt index 37108947ec..2aff4e6e42 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/IgnoreStageFailureHandlerTest.kt @@ -306,10 +306,6 @@ object IgnoreStageFailureHandlerTest : SubjectSpek({ ) } - it("Does not change the status of the pipeline because it's already running") { - verify(repository, never()).updateStatus(any()) - } - it("sends a StartStage message for all downstream stages") { verify(queue).push(any()) }