Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Revert node creation logic #1818

Merged
merged 6 commits into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,11 @@ func (d *dagContext) hasMoreRetries(node *wfv1.NodeStatus) bool {
return true
}

func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, boundaryID string) error {
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node == nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeSteps, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodeRunning)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node type is not correct here.
I fixed it in this commit.
396faa0

}

defer func() {
if woc.wf.Status.Nodes[node.ID].Completed() {
Expand Down Expand Up @@ -238,11 +241,11 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
dagPhase := dagCtx.assessDAGPhase(targetTasks, woc.wf.Status.Nodes)
switch dagPhase {
case wfv1.NodeRunning:
return nil
return node, nil
case wfv1.NodeError, wfv1.NodeFailed:
woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)
_ = woc.markNodePhase(nodeName, dagPhase)
return nil
return node, nil
}

// set outputs from tasks in order for DAG templates to support outputs
Expand All @@ -260,7 +263,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
return err
return node, err
}
if outputs != nil {
node = woc.getNodeByName(nodeName)
Expand All @@ -271,7 +274,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)

_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
return node, nil
}

func (woc *wfOperationCtx) updateOutboundNodesForTargetTasks(dagCtx *dagContext, targetTasks []string, nodeName string) {
Expand Down
86 changes: 42 additions & 44 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,40 +1281,22 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
}

// Initialize node based on the template type.
if node == nil {
var nodeType wfv1.NodeType
switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript, wfv1.TemplateTypeResource:
nodeType = wfv1.NodeTypePod
case wfv1.TemplateTypeSteps:
nodeType = wfv1.NodeTypeSteps
case wfv1.TemplateTypeDAG:
nodeType = wfv1.NodeTypeDAG
case wfv1.TemplateTypeSuspend:
nodeType = wfv1.NodeTypeSuspend
default:
err := errors.InternalErrorf("Template '%s' has unknown node type", processedTmpl.Name)
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, wfv1.NodeError, err.Error()), err
}
node = woc.initializeExecutableNode(nodeName, nodeType, templateScope, processedTmpl, orgTmpl, boundaryID, wfv1.NodePending)
}

switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer:
err = woc.executeContainer(node.Name, processedTmpl, boundaryID)
node, err = woc.executeContainer(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeSteps:
err = woc.executeSteps(node.Name, newTmplCtx, processedTmpl, boundaryID)
node, err = woc.executeSteps(nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeScript:
err = woc.executeScript(node.Name, processedTmpl, boundaryID)
node, err = woc.executeScript(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeResource:
err = woc.executeResource(node.Name, processedTmpl, boundaryID)
node, err = woc.executeResource(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeDAG:
err = woc.executeDAG(node.Name, newTmplCtx, processedTmpl, boundaryID)
node, err = woc.executeDAG(nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeSuspend:
err = woc.executeSuspend(node.Name, processedTmpl, boundaryID)
node, err = woc.executeSuspend(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
default:
err = errors.Errorf(errors.CodeBadRequest, "Template '%s' missing specification", processedTmpl.Name)
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, wfv1.NodeError, err.Error()), err
}
if err != nil {
node = woc.markNodeError(node.Name, err)
Expand Down Expand Up @@ -1556,10 +1538,16 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
return nil
}

func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
func (woc *wfOperationCtx) executeContainer(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node != nil {
return node, nil
}
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodePending)

woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, false)
return err
return node, err
}

func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
Expand Down Expand Up @@ -1663,12 +1651,18 @@ func getStepOrDAGTaskName(nodeName string, hasRetryStrategy bool) string {
return nodeName
}

func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node != nil {
return node, nil
}
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodePending)

includeScriptOutput := false
if boundaryNode, ok := woc.wf.Status.Nodes[boundaryID]; ok {
_, parentTemplate, err := woc.tmplCtx.ResolveTemplate(&boundaryNode)
if err != nil {
return err
return node, err
}
name := getStepOrDAGTaskName(nodeName, tmpl.RetryStrategy != nil)
includeScriptOutput = hasOutputResultRef(name, parentTemplate)
Expand All @@ -1677,10 +1671,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, includeScriptOutput)
if err != nil {
return err
}
return nil
return node, err
}

// processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well
Expand Down Expand Up @@ -1891,36 +1882,43 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
}

// executeResource is runs a kubectl command against a manifest
func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
func (woc *wfOperationCtx) executeResource(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node != nil {
return node, nil
}
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodePending)

tmpl = tmpl.DeepCopy()

// Try to unmarshal the given manifest.
obj := unstructured.Unstructured{}
err := yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &obj)
if err != nil {
return err
return node, err
}

if tmpl.Resource.SetOwnerReference {
ownerReferences := obj.GetOwnerReferences()
obj.SetOwnerReferences(append(ownerReferences, *metav1.NewControllerRef(woc.wf, wfv1.SchemeGroupVersion.WithKind(workflow.WorkflowKind))))
bytes, err := yaml.Marshal(obj.Object)
if err != nil {
return err
return node, err
}
tmpl.Resource.Manifest = string(bytes)
}

mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl, false)
if err != nil {
return err
}
return nil
return node, err
}

func (woc *wfOperationCtx) executeSuspend(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node == nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeSuspend, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodePending)
}
woc.log.Infof("node %s suspended", nodeName)

// If there is either an active workflow deadline, or if this node is suspended with a duration, then the workflow
Expand All @@ -1931,15 +1929,15 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, tmpl *wfv1.Template,
node := woc.getNodeByName(nodeName)
suspendDuration, err := parseStringToDuration(tmpl.Suspend.Duration)
if err != nil {
return err
return node, err
}
suspendDeadline := node.StartedAt.Add(suspendDuration)
requeueTime = &suspendDeadline
if time.Now().UTC().After(suspendDeadline) {
// Suspension is expired, node can be resumed
woc.log.Infof("auto resuming node %s", nodeName)
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
return node, nil
}
}

Expand All @@ -1957,7 +1955,7 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, tmpl *wfv1.Template,
}

_ = woc.markNodePhase(nodeName, wfv1.NodeRunning)
return nil
return node, nil
}

func parseStringToDuration(durationString string) (time.Duration, error) {
Expand Down
19 changes: 11 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ type stepsContext struct {
tmplCtx *templateresolution.Context
}

func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, boundaryID string) error {
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolution.Context, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) (*wfv1.NodeStatus, error) {
node := woc.getNodeByName(nodeName)
if node == nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypeSteps, templateScope, tmpl, orgTmpl, boundaryID, wfv1.NodeRunning)
}

defer func() {
if woc.wf.Status.Nodes[node.ID].Completed() {
Expand Down Expand Up @@ -79,15 +82,15 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu

if !sgNode.Completed() {
woc.log.Infof("Workflow step group node %v not yet completed", sgNode)
return nil
return node, nil
}

if !sgNode.Successful() {
failMessage := fmt.Sprintf("step group %s was unsuccessful: %s", sgNode.ID, sgNode.Message)
woc.log.Info(failMessage)
woc.updateOutboundNodes(nodeName, tmpl)
_ = woc.markNodePhase(nodeName, wfv1.NodeFailed, sgNode.Message)
return nil
return node, nil
}

// Add all outputs of each step in the group to the scope
Expand All @@ -108,11 +111,11 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
// Expanded child nodes should be created from the same template.
_, tmpl, err := woc.tmplCtx.ResolveTemplate(&childNodes[0])
if err != nil {
return err
return node, err
}
err = woc.processAggregateNodeOutputs(tmpl, stepsCtx.scope, prefix, childNodes)
if err != nil {
return err
return node, err
}
} else {
woc.log.Infof("Step '%s' has no expanded child nodes", childNode)
Expand All @@ -126,7 +129,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
// If this template has outputs from any of its steps, copy them to this node here
outputs, err := getTemplateOutputsFromScope(tmpl, stepsCtx.scope)
if err != nil {
return err
return node, err
}
if outputs != nil {
node := woc.getNodeByName(nodeName)
Expand All @@ -135,7 +138,7 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmplCtx *templateresolu
}

_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
return node, nil
}

// updateOutboundNodes set the outbound nodes from the last step group
Expand Down
Loading