Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-67164] Call StepExecution.onResume in response to WorkflowRun.onLoad not FlowExecutionList.ItemListenerImpl #221

Merged
merged 5 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,12 @@ public Iterable<BlockStartNode> iterateEnclosingBlocks(@NonNull FlowNode node) {
protected void notifyShutdown() {
// Default is no-op
}

/**
* Called after a restart and any attempts at {@link StepExecution#onResume} have completed.
* This is a signal that it is safe to resume program execution.
* By default, does nothing.
*/
protected void afterStepExecutionsResumed() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import hudson.Extension;
import hudson.ExtensionList;
import hudson.XmlFile;
import hudson.init.InitMilestone;
import hudson.init.Terminator;
import hudson.model.listeners.ItemListener;
import hudson.remoting.SingleLaneExecutorService;
Expand All @@ -31,6 +32,7 @@
import java.util.logging.Logger;

import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;
import org.kohsuke.accmod.restrictions.DoNotUse;

/**
Expand All @@ -44,6 +46,8 @@ public class FlowExecutionList implements Iterable<FlowExecution> {
private final SingleLaneExecutorService executor = new SingleLaneExecutorService(Timer.get());
private XmlFile configFile;

private transient volatile boolean resumptionComplete;

public FlowExecutionList() {
load();
}
Expand Down Expand Up @@ -160,11 +164,17 @@ public static FlowExecutionList get() {
}

/**
* @deprecated Only exists for binary compatibility.
* Returns true if all executions that were present in this {@link FlowExecutionList} have been loaded.
*
* <p>This takes place slightly after {@link InitMilestone#COMPLETED} is reached during Jenkins startup.
*
* <p>Useful to avoid resuming Pipelines in contexts that may lead to deadlock.
*
* <p>It is <em>not</em> guaranteed that {@link FlowExecution#afterStepExecutionsResumed} has been called at this point.
*/
@Deprecated
@Restricted(Beta.class)
public boolean isResumptionComplete() {
return false;
return resumptionComplete;
}

/**
Expand All @@ -179,29 +189,8 @@ public void onLoaded() {
for (final FlowExecution e : list) {
// The call to FlowExecutionOwner.get in the implementation of iterator() is sufficent to load the Pipeline.
LOGGER.log(Level.FINE, "Eagerly loaded {0}", e);
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
}
}, MoreExecutors.directExecutor());
}
list.resumptionComplete = true;
}
}

Expand Down Expand Up @@ -256,4 +245,56 @@ public void onFailure(@NonNull Throwable t) {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}

/**
* Whenever a Pipeline resumes, resume all incomplete steps in its {@link FlowExecution}.
*
* <p>Called by {@code WorkflowRun.onLoad}, so guaranteed to run if a Pipeline resumes
* regardless of its presence in {@link FlowExecutionList}.
*/
@Extension
public static class ResumeStepExecutionListener extends FlowExecutionListener {
@Override
public void onResumed(@NonNull FlowExecution e) {
Futures.addCallback(e.getCurrentExecutions(false), new FutureCallback<List<StepExecution>>() {
@Override
public void onSuccess(@NonNull List<StepExecution> result) {
try {
if (e.isComplete()) {
// WorkflowRun.onLoad will not fireResumed if the execution was already complete when loaded,
// and CpsFlowExecution should not then complete until afterStepExecutionsResumed, so this is defensive.
return;
}
FlowExecutionList list = FlowExecutionList.get();
FlowExecutionOwner owner = e.getOwner();
if (!list.runningTasks.contains(owner)) {
LOGGER.log(Level.WARNING, "Resuming {0}, which is missing from FlowExecutionList ({1}), so registering it now.", new Object[] {owner, list.runningTasks.getView()});
list.register(owner);
}
LOGGER.log(Level.FINE, "Will resume {0}", result);
for (StepExecution se : result) {
try {
se.onResume();
} catch (Throwable x) {
se.getContext().onFailure(x);
}
}
} finally {
e.afterStepExecutionsResumed();
}
}

@Override
public void onFailure(@NonNull Throwable t) {
if (t instanceof CancellationException) {
LOGGER.log(Level.FINE, "Cancelled load of " + e, t);
} else {
LOGGER.log(Level.WARNING, "Failed to load " + e, t);
}
e.afterStepExecutionsResumed();
}

}, Timer.get()); // We always hold RunMap and WorkflowRun locks here, so we resume steps on a different thread to avoid potential deadlocks. See JENKINS-67351.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void onCompleted(@NonNull FlowExecution execution) {
* Fires the {@link #onCreated(FlowExecution)} event.
*/
public static void fireCreated(@NonNull FlowExecution execution) {
// TODO Jenkins 2.325+ use Listeners.notify
for (FlowExecutionListener listener : ExtensionList.lookup(FlowExecutionListener.class)) {
listener.onCreated(execution);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,44 @@

package org.jenkinsci.plugins.workflow.flow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNotNull;

import hudson.AbortException;
import hudson.model.ParametersAction;
import hudson.model.ParametersDefinitionProperty;
import hudson.model.Result;
import hudson.model.StringParameterDefinition;
import hudson.model.StringParameterValue;
import hudson.model.TaskListener;
import hudson.model.queue.QueueTaskFuture;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.hamcrest.Matcher;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.jenkinsci.plugins.workflow.steps.Step;
import org.jenkinsci.plugins.workflow.steps.StepContext;
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
import org.jenkinsci.plugins.workflow.steps.StepExecution;
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.Rule;
import org.jvnet.hudson.test.BuildWatcher;
import org.jvnet.hudson.test.Issue;
import org.jvnet.hudson.test.LoggerRule;
import org.jvnet.hudson.test.JenkinsSessionRule;
import org.jvnet.hudson.test.TestExtension;
import org.kohsuke.stapler.DataBoundConstructor;

public class FlowExecutionListTest {

Expand Down Expand Up @@ -79,4 +99,113 @@ public class FlowExecutionListTest {
});
}

@Test public void forceLoadRunningExecutionsAfterRestart() throws Throwable {
logging.capture(50);
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("semaphore('wait')", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
SemaphoreStep.waitForStart("wait/1", b);
});
sessions.then(r -> {
/*
Make sure that the build gets loaded automatically by FlowExecutionList$ItemListenerImpl before we load it explictly.
Expected call stack for resuming a Pipelines and its steps:
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:250)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener$1.onSuccess(FlowExecutionList.java:247)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:975)
at org.jenkinsci.plugins.workflow.flow.DirectExecutor.execute(DirectExecutor.java:33)
... Guava Futures API internals ...
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:985)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ResumeStepExecutionListener.onResumed(FlowExecutionList.java:247)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionListener.fireResumed(FlowExecutionListener.java:84)
at org.jenkinsci.plugins.workflow.job.WorkflowRun.onLoad(WorkflowRun.java:528)
at hudson.model.RunMap.retrieve(RunMap.java:225)
... RunMap internals ...
at hudson.model.RunMap.getById(RunMap.java:205)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.run(WorkflowRun.java:937)
at org.jenkinsci.plugins.workflow.job.WorkflowRun$Owner.get(WorkflowRun.java:948)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:65)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$1.computeNext(FlowExecutionList.java:57)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
*/
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
SemaphoreStep.success("wait/1", null);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.SUCCESS, b);
});
}

@Issue("JENKINS-67164")
@Test public void resumeStepExecutions() throws Throwable {
sessions.then(r -> {
WorkflowJob p = r.jenkins.createProject(WorkflowJob.class, "p");
p.setDefinition(new CpsFlowDefinition("noResume()", true));
WorkflowRun b = p.scheduleBuild2(0).waitForStart();
r.waitForMessage("Starting non-resumable step", b);
// TODO: Unclear how this might happen in practice.
FlowExecutionList.get().unregister(b.asFlowExecutionOwner());
});
sessions.then(r -> {
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
WorkflowRun b = p.getBuildByNumber(1);
r.waitForCompletion(b);
r.assertBuildStatus(Result.FAILURE, b);
r.assertLogContains("Unable to resume NonResumableStep", b);
});
}

public static class NonResumableStep extends Step implements Serializable {
public static final long serialVersionUID = 1L;
@DataBoundConstructor
public NonResumableStep() { }
@Override
public StepExecution start(StepContext sc) {
return new ExecutionImpl(sc);
}

private static class ExecutionImpl extends StepExecution implements Serializable {
public static final long serialVersionUID = 1L;
private ExecutionImpl(StepContext sc) {
super(sc);
}
@Override
public boolean start() throws Exception {
getContext().get(TaskListener.class).getLogger().println("Starting non-resumable step");
return false;
}
@Override
public void onResume() {
getContext().onFailure(new AbortException("Unable to resume NonResumableStep"));
}
}

@TestExtension public static class DescriptorImpl extends StepDescriptor {
@Override
public Set<? extends Class<?>> getRequiredContext() {
return Collections.singleton(TaskListener.class);
}
@Override
public String getFunctionName() {
return "noResume";
}
}
}

/**
* Wait up to 5 seconds for the given supplier to return a matching value.
*/
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
Instant end = Instant.now().plus(Duration.ofSeconds(5));
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
Thread.sleep(100L);
}
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
}

}