Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store resolved templates #1552

Merged
merged 15 commits into from
Sep 30, 2019
3 changes: 3 additions & 0 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func printWorkflowHelper(wf *wfv1.Workflow, getArgs getFlags) {

// Print main and onExit Trees
mainRoot := roots[wf.ObjectMeta.Name]
if mainRoot == nil {
panic("failed to get the entrypoint node")
}
mainRoot.renderNodes(w, wf, 0, " ", " ", getArgs)

onExitID := wf.NodeID(wf.ObjectMeta.Name + "." + onExitSuffix)
Expand Down
52 changes: 52 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
)

// TemplateType is the type of a template
Expand Down Expand Up @@ -67,6 +68,7 @@ const (
type TemplateGetter interface {
GetNamespace() string
GetName() string
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
}

Expand Down Expand Up @@ -343,6 +345,13 @@ func (tmpl *Template) GetTemplateRef() *TemplateRef {
return tmpl.TemplateRef
}

// GetBaseTemplate returns a base template content.
func (tmpl *Template) GetBaseTemplate() *Template {
baseTemplate := tmpl.DeepCopy()
baseTemplate.Inputs = Inputs{}
return baseTemplate
}

// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
type Inputs struct {
// Parameters are a list of parameters passed as inputs
Expand Down Expand Up @@ -639,6 +648,9 @@ type WorkflowStatus struct {
// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

// StoredTemplates is a mapping between a template ref and the node's status.
StoredTemplates map[string]Template `json:"storedTemplates,omitempty"`

// PersistentVolumeClaims tracks all PVCs that were created as part of the workflow.
// The contents of this list are drained at the end of the workflow.
PersistentVolumeClaims []apiv1.Volume `json:"persistentVolumeClaims,omitempty"`
Expand Down Expand Up @@ -677,6 +689,9 @@ type NodeStatus struct {
// Not applicable to virtual nodes (e.g. Retry, StepGroup)
TemplateRef *TemplateRef `json:"templateRef,omitempty"`

// WorkflowTemplateName is the WorkflowTemplate resource name on which the resolved template of this node is retrieved.
WorkflowTemplateName string `json:"workflowTemplateName,omitempty"`

// Phase a simple, high-level summary of where the node is in its lifecycle.
// Can be used as a state machine.
Phase NodePhase `json:"phase,omitempty"`
Expand Down Expand Up @@ -778,6 +793,16 @@ func (n NodeStatus) CanRetry() bool {
return n.Completed() && !n.Successful()
}

// GetBaseTemplateID returns a base template ID if available.
func (n *NodeStatus) GetBaseTemplateID() string {
if n.TemplateRef != nil {
return fmt.Sprintf("%s/%s", n.TemplateRef.Name, n.TemplateRef.Template)
} else if n.WorkflowTemplateName != "" {
return fmt.Sprintf("%s/%s", n.WorkflowTemplateName, n.TemplateName)
}
return ""
}

// S3Bucket contains the access information required for interfacing with an S3 bucket
type S3Bucket struct {
// Endpoint is the hostname of the bucket endpoint
Expand Down Expand Up @@ -1202,6 +1227,33 @@ func (wf *Workflow) NodeID(name string) string {
return fmt.Sprintf("%s-%v", wf.ObjectMeta.Name, h.Sum32())
}

// GetStoredTemplate gets a resolved template from stored data.
func (wf *Workflow) GetStoredTemplate(node *NodeStatus) *Template {
id := node.GetBaseTemplateID()
tmpl, ok := wf.Status.StoredTemplates[id]
if ok {
return &tmpl
}
return nil
}

// GetStoredOrLocalTemplate gets a resolved template from stored data or local template.
func (wf *Workflow) GetStoredOrLocalTemplate(node *NodeStatus) *Template {
// Try to find a template from stored data.
tmpl := wf.GetStoredTemplate(node)
if tmpl != nil {
return tmpl
}
// Try to get template from Workflow.
if node.WorkflowTemplateName == "" && node.TemplateName != "" {
Copy link
Member Author

@dtaniwaki dtaniwaki Sep 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a trade-off. If we store used local templates in StoredTemplates as well, we can remove the conditional from GetStoredTemplate and make the code simpler.

tmpl := wf.GetTemplateByName(node.TemplateName)
if tmpl != nil {
return tmpl
}
}
return nil
}

// ContinueOn defines if a workflow should continue even if a task or step fails/errors.
// It can be specified if the workflow should continue when the pod errors, fails or both.
type ContinueOn struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 23 additions & 23 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,34 +186,29 @@ func (d *dagContext) hasMoreRetries(node *wfv1.NodeStatus) bool {
return true
}
// pick the first child to determine it's template type
childNode := d.wf.Status.Nodes[node.Children[0]]
tmpl, err := d.tmplCtx.GetTemplate(&childNode)
if err != nil {
childNode, ok := d.wf.Status.Nodes[node.Children[0]]
if !ok {
return false
}
if tmpl.RetryStrategy.Limit != nil && int32(len(node.Children)) > *tmpl.RetryStrategy.Limit {
tmpl := d.wf.GetStoredOrLocalTemplate(&childNode)
if tmpl != nil && tmpl.RetryStrategy != nil && tmpl.RetryStrategy.Limit != nil && int32(len(node.Children)) > *tmpl.RetryStrategy.Limit {
return false
}
return true
}

func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string) *wfv1.NodeStatus {
node := woc.getNodeByName(nodeName)
if node != nil && node.Completed() {
return node
}
if node == nil {
node = woc.initializeNode(nodeName, wfv1.NodeTypeDAG, orgTmpl, boundaryID, wfv1.NodeRunning)
}
func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresolution.Context, tmpl *wfv1.Template, boundaryID string) error {
node := woc.markNodePhase(nodeName, wfv1.NodeRunning)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the logic has been changed to unconditionally mark this node as Running. Previously, executeDAG was a NO-OP when the dag node was already considered completed. The new logic means we will flap back to running no matter what and go through re-assessment logic.

I'd like to understand the reason for this change since it seems like we should never flap back to running.

Copy link
Member Author

@dtaniwaki dtaniwaki Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the logic and node completions are checked at the beginning of executeTemplate now. So, I assume a node is not completed in executeDAG.

https://github.com/argoproj/argo/pull/1552/files#diff-fcb04129f1d32e69cca32631c6586587R1108-R1114

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know executeDAG can be called outside of executeTemplate currently and it makes a node status unexpected. We need a big refactoring to avoid this unexpected call by class structuring. I'm totally fine to put completion checks in sub-methods called from executeTemplate for now.


defer func() {
if node != nil && woc.wf.Status.Nodes[node.ID].Completed() {
if woc.wf.Status.Nodes[node.ID].Completed() {
_ = woc.killDaemonedChildren(node.ID)
}
}()

dagCtx := &dagContext{
boundaryName: nodeName,
boundaryID: woc.wf.NodeID(nodeName),
boundaryID: node.ID,
tasks: tmpl.DAG.Tasks,
visited: make(map[string]bool),
tmpl: tmpl,
Expand All @@ -238,9 +233,10 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
dagPhase := dagCtx.assessDAGPhase(targetTasks, woc.wf.Status.Nodes)
switch dagPhase {
case wfv1.NodeRunning:
return woc.getNodeByName(nodeName)
return nil
case wfv1.NodeError, wfv1.NodeFailed:
return woc.markNodePhase(nodeName, dagPhase)
_ = woc.markNodePhase(nodeName, dagPhase)
return nil
}

// set outputs from tasks in order for DAG templates to support outputs
Expand All @@ -258,7 +254,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}
outputs, err := getTemplateOutputsFromScope(tmpl, &scope)
if err != nil {
return woc.markNodeError(nodeName, err)
return err
}
if outputs != nil {
node = woc.getNodeByName(nodeName)
Expand All @@ -267,7 +263,6 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}

// set the outbound nodes from the target tasks
node = woc.getNodeByName(nodeName)
outbound := make([]string, 0)
for _, depName := range targetTasks {
depNode := dagCtx.GetTaskNode(depName)
Expand All @@ -278,10 +273,12 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
outbound = append(outbound, outboundNodeIDs...)
}
woc.log.Infof("Outbound nodes of %s set to %s", node.ID, outbound)
node = woc.getNodeByName(nodeName)
node.OutboundNodes = outbound
woc.wf.Status.Nodes[node.ID] = *node

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
_ = woc.markNodePhase(nodeName, wfv1.NodeSucceeded)
return nil
}

// executeDAGTask traverses and executes the upward chain of dependencies of a task
Expand Down Expand Up @@ -437,6 +434,9 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
ancestors := common.GetTaskAncestry(dagCtx, task.Name, dagCtx.tasks)
for _, ancestor := range ancestors {
ancestorNode := dagCtx.GetTaskNode(ancestor)
if ancestorNode == nil {
return nil, errors.InternalErrorf("Ancestor task node %s not found", ancestor)
}
prefix := fmt.Sprintf("tasks.%s", ancestor)
if ancestorNode.Type == wfv1.NodeTypeTaskGroup {
var ancestorNodes []wfv1.NodeStatus
Expand All @@ -445,11 +445,11 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
ancestorNodes = append(ancestorNodes, node)
}
}
tmpl, err := dagCtx.tmplCtx.GetTemplate(ancestorNode)
if err != nil {
return nil, errors.InternalWrapError(err)
tmpl := dagCtx.wf.GetStoredOrLocalTemplate(ancestorNode)
if tmpl != nil {
return nil, errors.InternalErrorf("Template of ancestor node '%s' not found", ancestorNode.Name)
}
err = woc.processAggregateNodeOutputs(tmpl, &scope, prefix, ancestorNodes)
err := woc.processAggregateNodeOutputs(tmpl, &scope, prefix, ancestorNodes)
if err != nil {
return nil, errors.InternalWrapError(err)
}
Expand Down
Loading