Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(Execution): Allow ignoring failed stages #4113

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ public PipelineExecution restartStage(@Nonnull String executionId, @Nonnull Stri
return execution;
}

public PipelineExecution ignoreStageFailure(
@Nonnull String executionId, @Nonnull String stageId, String reason) {
PipelineExecution execution = repository.retrieve(ExecutionType.PIPELINE, executionId);
if (repository.handlesPartition(execution.getPartition())) {
runner.ignoreFailure(execution, stageId, reason);
} else {
log.info(
"Not pushing queue message action='ignoreFailure' for execution with foreign partition='{}'",
execution.getPartition());
repository.ignoreStageFailure(executionId, stageId, reason);
}
return execution;
}

private PipelineExecution doInternal(
Consumer<PipelineExecution> runnerAction,
Runnable repositoryAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ default void restart(@Nonnull PipelineExecution execution, @Nonnull String stage
throw new UnsupportedOperationException();
}

default void ignoreFailure(
@Nonnull PipelineExecution execution, @Nonnull String stageId, String reason) {
throw new UnsupportedOperationException();
}

default void reschedule(@Nonnull PipelineExecution execution) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ default boolean handlesPartition(@Nullable String partitionOfExecution) {
// foreign executions
default void restartStage(String executionId, String stageId) {}

default void ignoreStageFailure(String executionId, String stageId, String reason) {}

final class ExecutionCriteria {
private int pageSize = 3500;
private Collection<ExecutionStatus> statuses = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.interlink.events;

import static com.netflix.spinnaker.orca.interlink.events.InterlinkEvent.EventType.IGNORE_FAILURE;

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType;
import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* This event is published on the interlink as a result of a user IGNORING THE FAILURE of a stage on
* an orca instance that can't handle the partition for the given execution.
*
* <p>The event is then handled by an orca instance (listening on interlink) whose partition matches
* that of the execution. The resulting repository mutations of this event will then be peered by
* the PeeringAgent
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IgnoreStageFailureInterlinkEvent implements InterlinkEvent {
final EventType eventType = IGNORE_FAILURE;
@Nullable String partition;
@Nonnull ExecutionType executionType;
@Nonnull String executionId;
@Nonnull String stageId;
String reason;

public IgnoreStageFailureInterlinkEvent(
@Nonnull ExecutionType executionType,
@Nonnull String executionId,
@Nonnull String stageId,
String reason) {
// for the moment, only ExecutionType.PIPELINE can have ignored stages
// but since we are defining the protocol on the wire here, let's be a bit future proof and
// accept potentially different execution types
this.executionType = executionType;
this.executionId = executionId;
this.stageId = stageId;
this.reason = reason;
}

@Override
public void applyTo(CompoundExecutionOperator executionOperator) {
executionOperator.ignoreStageFailure(executionId, stageId, reason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
@JsonSubTypes.Type(value = ResumeInterlinkEvent.class, name = "RESUME"),
@JsonSubTypes.Type(value = DeleteInterlinkEvent.class, name = "DELETE"),
@JsonSubTypes.Type(value = PatchStageInterlinkEvent.class, name = "PATCH"),
@JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART")
@JsonSubTypes.Type(value = RestartStageInterlinkEvent.class, name = "RESTART"),
@JsonSubTypes.Type(value = IgnoreStageFailureInterlinkEvent.class, name = "IGNORE_FAILURE")
})
public interface InterlinkEvent {
enum EventType {
Expand All @@ -53,7 +54,8 @@ enum EventType {
DELETE,
RESUME,
PATCH,
RESTART
RESTART,
IGNORE_FAILURE
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class QueueExecutionRunner(
queue.push(RestartStage(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null)))
}

override fun ignoreFailure(execution: PipelineExecution, stageId: String, reason: String?) {
queue.push(IgnoreStageFailure(execution, stageId, AuthenticatedRequest.getSpinnakerUser().orElse(null), reason))
}

override fun unpause(execution: PipelineExecution) {
queue.push(ResumeExecution(execution))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.*
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.IgnoreStageFailure
import com.netflix.spinnaker.orca.q.pending.PendingExecutionService
import com.netflix.spinnaker.q.Queue
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.time.Clock

@Component
class IgnoreStageFailureHandler(
override val queue: Queue,
override val repository: ExecutionRepository,
override val stageDefinitionBuilderFactory: StageDefinitionBuilderFactory,
private val pendingExecutionService: PendingExecutionService,
private val clock: Clock
) : OrcaMessageHandler<IgnoreStageFailure>, StageBuilderAware {

override val messageType = IgnoreStageFailure::class.java

private val log: Logger get() = LoggerFactory.getLogger(javaClass)

override fun handle(message: IgnoreStageFailure) {
message.withStage { stage ->

if (!stage.status.isHalt) {
log.warn("Attempting to ignore the failure of stage $stage which is not halted. Will ignore")
} else if (stage.execution.shouldQueue()) {
// this pipeline is already running and has limitConcurrent = true
stage.execution.pipelineConfigId?.let {
log.info("Queueing IgnoreStageFailure of {} {} {}", stage.execution.application, stage.execution.name, stage.execution.id)
pendingExecutionService.enqueue(it, message)
}
} else {
stage.status = FAILED_CONTINUE
stage.addIgnoreFailureDetails(message.user, message.reason)
repository.storeStage(stage)

val topLevelStage = stage.topLevelStage
if (topLevelStage != stage) {
topLevelStage.status = RUNNING
repository.storeStage(topLevelStage)
}

val execution = topLevelStage.execution
stage.execution.updateStatus(RUNNING)
repository.updateStatus(execution)

stage.startNext()
}
}
}

private fun StageExecution.addIgnoreFailureDetails(user: String?, reason: String?) {
context["ignoreFailureDetails"] = mapOf(
"by" to (user ?: "anonymous"),
"reason" to (reason ?: "unspecified"),
"time" to clock.millis(),
"previousException" to context.remove("exception")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ data class RestartStage(
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user)
}

@JsonTypeName("ignoreStageFailure")
data class IgnoreStageFailure(
override val executionType: ExecutionType,
override val executionId: String,
override val application: String,
override val stageId: String,
val user: String?,
val reason: String?
) : Message(), StageLevel {
constructor(source: PipelineExecution, stageId: String, user: String?, reason: String?) :
this(source.type, source.id, source.application, stageId, user, reason)

constructor(stage: StageExecution, user: String?, reason: String?) :
this(stage.execution.type, stage.execution.id, stage.execution.application, stage.id, user, reason)
}

@JsonTypeName("resumeStage")
data class ResumeStage(
override val executionType: ExecutionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal val orcaToKeikoTypes = mapOf(
".PauseStage" to "pauseStage",
".RestartStage" to "restartStage",
".ResumeStage" to "resumeStage",
".IgnoreStageFailure" to "ignoreStageFailure",
".CancelStage" to "cancelStage",
".StartExecution" to "startExecution",
".RescheduleExecution" to "rescheduleExecution",
Expand Down
Loading