Skip to content

Commit

Permalink
Save stored template ID in nodes (#1631)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaniwaki authored and sarabala1979 committed Oct 2, 2019
1 parent 5d530be commit 644946e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 19 deletions.
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,10 @@
"description": "Time at which this node started",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
},
"storedTemplateID": {
"description": "StoredTemplateID is the ID of stored template.",
"type": "string"
},
"templateName": {
"description": "TemplateName is the template name which this node corresponds to. Not applicable to virtual nodes (e.g. Retry, StepGroup)",
"type": "string"
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 7 additions & 11 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,9 @@ type NodeStatus struct {
// Not applicable to virtual nodes (e.g. Retry, StepGroup)
TemplateRef *TemplateRef `json:"templateRef,omitempty"`

// StoredTemplateID is the ID of stored template.
StoredTemplateID string `json:"storedTemplateID,omitempty"`

// WorkflowTemplateName is the WorkflowTemplate resource name on which the resolved template of this node is retrieved.
WorkflowTemplateName string `json:"workflowTemplateName,omitempty"`

Expand Down Expand Up @@ -825,16 +828,6 @@ func (n NodeStatus) CanRetry() bool {
return n.Completed() && !n.Successful()
}

// GetBaseTemplateID returns a base template ID if available.
func (n *NodeStatus) GetBaseTemplateID() string {
if n.TemplateRef != nil {
return fmt.Sprintf("%s/%s", n.TemplateRef.Name, n.TemplateRef.Template)
} else if n.WorkflowTemplateName != "" {
return fmt.Sprintf("%s/%s", n.WorkflowTemplateName, n.TemplateName)
}
return ""
}

// S3Bucket contains the access information required for interfacing with an S3 bucket
type S3Bucket struct {
// Endpoint is the hostname of the bucket endpoint
Expand Down Expand Up @@ -1266,7 +1259,10 @@ func (wf *Workflow) NodeID(name string) string {

// GetStoredTemplate gets a resolved template from stored data.
func (wf *Workflow) GetStoredTemplate(node *NodeStatus) *Template {
id := node.GetBaseTemplateID()
id := node.StoredTemplateID
if id == "" {
return nil
}
tmpl, ok := wf.Status.StoredTemplates[id]
if ok {
return &tmpl
Expand Down
20 changes: 12 additions & 8 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ func (woc *wfOperationCtx) markWorkflowError(err error, markCompleted bool) {
// DAG or steps templates. Will match stings with prefix like: [0]. or .
var stepsOrDagSeparator = regexp.MustCompile(`^(\[\d+\])?\.`)

// initializeExecutableNode initializes a node and stores the base template.
// initializeExecutableNode initializes a node and stores the template.
func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wfv1.NodeType, tmplCtx *templateresolution.Context, executeTmpl *wfv1.Template, orgTmpl wfv1.TemplateHolder, boundaryID string, phase wfv1.NodePhase, messages ...string) *wfv1.NodeStatus {
node := woc.initializeNode(nodeName, nodeType, orgTmpl, boundaryID, phase)

Expand All @@ -1312,16 +1312,20 @@ func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wf
node.WorkflowTemplateName = tmplCtx.GetCurrentTemplateBase().GetName()
}

// Store base template for the later use.
baseTemplateID := node.GetBaseTemplateID()
if baseTemplateID != "" {
// Store the template for the later use.
if node.TemplateRef != nil {
node.StoredTemplateID = fmt.Sprintf("%s/%s", node.TemplateRef.Name, node.TemplateRef.Template)
} else if node.WorkflowTemplateName != "" {
node.StoredTemplateID = fmt.Sprintf("%s/%s", node.WorkflowTemplateName, node.TemplateName)
}
if node.StoredTemplateID != "" {
baseTemplate := executeTmpl.GetBaseTemplate()
_, exists := woc.wf.Status.StoredTemplates[baseTemplateID]
_, exists := woc.wf.Status.StoredTemplates[node.StoredTemplateID]
if !exists {
woc.log.Infof("Create base template '%s'", baseTemplateID)
woc.wf.Status.StoredTemplates[baseTemplateID] = *baseTemplate
woc.log.Infof("Create stored template '%s'", node.StoredTemplateID)
woc.wf.Status.StoredTemplates[node.StoredTemplateID] = *baseTemplate
} else {
woc.log.Infof("Base template '%s' already exists", baseTemplateID)
woc.log.Infof("Stored template '%s' already exists", node.StoredTemplateID)
}
}

Expand Down

0 comments on commit 644946e

Please sign in to comment.