diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f8ca6a30d674..8bd1c1ae9992 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -193,6 +193,7 @@ func (woc *wfOperationCtx) operate() { } else { woc.log.Errorf("Failed to load artifact repository configMap: %+v", err) woc.markWorkflowError(err, true) + return } } @@ -1075,6 +1076,11 @@ func (woc *wfOperationCtx) createPVCs() error { } func (woc *wfOperationCtx) deletePVCs() error { + if woc.wf.Status.Phase != wfv1.NodeSucceeded { + // Skip deleting PVCs to reuse them for retried failed/error workflows. + // PVCs are automatically deleted when corresponded owner workflows get deleted. + return nil + } totalPVCs := len(woc.wf.Status.PersistentVolumeClaims) if totalPVCs == 0 { // PVC list already empty. nothing to do @@ -1286,6 +1292,16 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted woc.wf.Status.Message = message[0] } + if phase == wfv1.NodeError { + entryNode, ok := woc.wf.Status.Nodes[woc.wf.ObjectMeta.Name] + if ok && entryNode.Phase == wfv1.NodeRunning { + entryNode.Phase = wfv1.NodeError + entryNode.Message = "Workflow operation error" + woc.wf.Status.Nodes[woc.wf.ObjectMeta.Name] = entryNode + woc.updated = true + } + } + switch phase { case wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError: // wait for all daemon nodes to get terminated before marking workflow completed