Skip to content

Commit

Permalink
fix(controller): Fix node status when daemon pod deleted but its chil…
Browse files Browse the repository at this point in the history
…dren nodes are still running (#4683)

Signed-off-by: lons <lonsdale8734@gmail.com>
  • Loading branch information
lonsdale8734 authored Jan 13, 2021
1 parent 955a4bb commit 2ff11cc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
5 changes: 5 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,11 @@ type TaskResults struct {
// evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are
// completed and execute means that given the results of its dependencies, this node should execute.
func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
node := d.getTaskNode(taskName)
if node != nil {
return true, true, nil
}

evalScope := make(map[string]TaskResults)

for _, taskName := range d.GetTaskDependencies(taskName) {
Expand Down
49 changes: 49 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,55 @@ func TestEvaluateAnyAllDependsLogic(t *testing.T) {

}

func TestEvaluateDependsLogicWhenDaemonFailed(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "B",
Depends: "A",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is running
daemon := true
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {Phase: wfv1.NodeRunning, Daemoned: &daemon},
},
},
}

// Task B should proceed and execute
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B running
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{Phase: wfv1.NodeRunning}

// Task A failed or error
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task B should proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)
}

func TestAllEvaluateDependsLogic(t *testing.T) {
statusMap := map[common.TaskResult]wfv1.NodePhase{
common.TaskResultSucceeded: wfv1.NodeSucceeded,
Expand Down
7 changes: 6 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
// It is now impossible to infer pod status. We can do at this point is to mark the node with Error, or
// we can re-submit it.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Fulfilled() || node.StartedAt.IsZero() {
if node.Type != wfv1.NodeTypePod || node.Phase.Fulfilled() || node.StartedAt.IsZero() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
Expand All @@ -950,6 +950,10 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
continue
}

if node.Daemoned != nil && *node.Daemoned {
node.Daemoned = nil
woc.updated = true
}
woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted")
} else {
// At this point we are certain that the pod associated with our node is running or has been run;
Expand Down Expand Up @@ -1100,6 +1104,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
if pod.DeletionTimestamp != nil {
// pod is being terminated
newPhase = wfv1.NodeError
newDaemonStatus = pointer.BoolPtr(false)
message = "pod deleted during operation"
woc.log.WithField("displayName", node.DisplayName).WithField("templateName", node.TemplateName).
WithField("pod", pod.Name).Error(message)
Expand Down

0 comments on commit 2ff11cc

Please sign in to comment.