Skip to content

Commit

Permalink
fix: fix metrics don't get emited properly during retry
Browse files Browse the repository at this point in the history
Signed-off-by: Jiacheng Xu <xjcmaxwellcjx@gmail.com>
  • Loading branch information
jiachengxu committed Feb 8, 2023
1 parent ab178bb commit 2f7ba6b
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (woc *wfOperationCtx) requeue() {
}

// processNodeRetries updates the retry node state based on the child node state and the retry strategy and returns the node.
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy, opts *executeTemplateOpts) (*wfv1.NodeStatus, bool, error) {
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy, executeTmpl *wfv1.Template, opts *executeTemplateOpts) (*wfv1.NodeStatus, bool, error) {
if node.Fulfilled() {
return node, true, nil
}
Expand Down Expand Up @@ -1018,11 +1018,16 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if err != nil {
return nil, false, err
}

if retryStrategy.Limit != nil && limit != nil && int32(len(node.Children)) > *limit {
woc.log.Infoln("No more retries left. Failing...")
return woc.markNodePhase(node.Name, lastChildNode.Phase, "No more retries left"), true, nil
}

if len(executeTmpl.Metrics.Prometheus) != 0 {
localScope, realTimeScope := woc.prepareMetricScope(lastChildNode)
woc.computeMetrics(executeTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
woc.log.Infof("%d child nodes of %s failed. Trying again...", len(node.Children), node.Name)
return node, true, nil
}
Expand Down Expand Up @@ -1898,7 +1903,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
woc.log.Debugf("Inject a retry node for node %s", retryNodeName)
retryParentNode = woc.initializeExecutableNode(retryNodeName, wfv1.NodeTypeRetry, templateScope, processedTmpl, orgTmpl, opts.boundaryID, wfv1.NodeRunning)
}
processedRetryParentNode, continueExecution, err := woc.processNodeRetries(retryParentNode, *woc.retryStrategy(processedTmpl), opts)
processedRetryParentNode, continueExecution, err := woc.processNodeRetries(retryParentNode, *woc.retryStrategy(processedTmpl), processedTmpl, opts)
if err != nil {
return woc.markNodeError(retryNodeName, err), err
} else if !continueExecution {
Expand All @@ -1907,7 +1912,12 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}
retryParentNode = processedRetryParentNode
// The retry node might have completed by now.
lastChildNode := getChildNodeIndex(retryParentNode, woc.wf.Status.Nodes, -1)
if retryParentNode.Fulfilled() {
if lastChildNode != nil {
retryParentNode.Outputs = lastChildNode.Outputs.DeepCopy()
woc.wf.Status.Nodes[node.ID] = *retryParentNode
}
if processedTmpl.Metrics != nil {
// In this check, a completed node may or may not have existed prior to this execution. If it did exist, ensure that it wasn't
// completed before this execution. If it did not exist prior, then we can infer that it was completed during this execution.
Expand All @@ -1921,14 +1931,8 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
if processedTmpl.Synchronization != nil {
woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization)
}
lastChildNode := getChildNodeIndex(retryParentNode, woc.wf.Status.Nodes, -1)
if lastChildNode != nil {
retryParentNode.Outputs = lastChildNode.Outputs.DeepCopy()
woc.wf.Status.Nodes[node.ID] = *retryParentNode
}
return retryParentNode, nil
}
lastChildNode := getChildNodeIndex(retryParentNode, woc.wf.Status.Nodes, -1)
if lastChildNode != nil && !lastChildNode.Fulfilled() {
// Last child node is still running.
nodeName = lastChildNode.Name
Expand Down

0 comments on commit 2f7ba6b

Please sign in to comment.