From 00f08b157ed403f728b237fb362bce3a5e7a88dc Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 13:18:33 -0700 Subject: [PATCH 1/6] CheckandEstimate implementation --- workflow/controller/operator.go | 67 ++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index bfc1ed6b52df..b04c045956d6 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -61,6 +61,15 @@ type wfOperationCtx struct { // workflowDeadline is the deadline which the workflow is expected to complete before we // terminate the workflow. workflowDeadline *time.Time + + // currentWFSize is current Workflow size + currentWFSize int + + // unSavedNodeStatusSize is unsaved workflow size + unSavedNodeStatusSize int + + // wfFailed is workflow failed status + isWFFailed bool } var ( @@ -124,7 +133,12 @@ func (woc *wfOperationCtx) operate() { woc.log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack()) } }() + woc.log.Infof("Processing workflow") + + // Initialize Workflow failed status + woc.wfFailed = false + // Perform one-time workflow validation if woc.wf.Status.Phase == "" { woc.markWorkflowRunning() @@ -453,9 +467,9 @@ func (woc *wfOperationCtx) podReconciliation() error { seenPodLock := &sync.Mutex{} wfNodesLock := &sync.RWMutex{} - performAssessment := func(pod *apiv1.Pod) { + performAssessment := func(pod *apiv1.Pod) string { if pod == nil { - return + return "" } nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] nodeID := woc.wf.NodeID(nodeNameForPod) @@ -475,16 +489,20 @@ func (woc *wfOperationCtx) podReconciliation() error { if node.Completed() && !node.IsDaemoned() { if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk { if tmpVal == "true" { - return + return nodeID } } woc.completedPods[pod.ObjectMeta.Name] = true } } + return nodeID } parallelPodNum := make(chan string, 500) var wg sync.WaitGroup + + woc.currentWFSize = woc.getSize() + for _, pod := range podList.Items { parallelPodNum <- pod.Name wg.Add(1) @@ -493,20 +511,23 @@ func (woc *wfOperationCtx) podReconciliation() error { wfNodesLock.Lock() origNodeStatus := *woc.wf.Status.DeepCopy() wfNodesLock.Unlock() - performAssessment(&tmpPod) + nodeID := performAssessment(&tmpPod) err = woc.applyExecutionControl(&tmpPod, wfNodesLock) if err != nil { woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name) } wfNodesLock.Lock() defer wfNodesLock.Unlock() - err = woc.checkAndCompress() + err = woc.checkAndEstimate(nodeID) if err != nil { woc.wf.Status = origNodeStatus nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName] woc.log.Warnf("%v", err) woc.markNodeErrorClearOuput(nodeNameForPod, err) err = woc.checkAndCompress() + if err != nil { + woc.wfFailed = true + } } <-parallelPodNum }(pod) @@ -1664,17 +1685,19 @@ func (woc *wfOperationCtx) getSize() int { // The compressed content will be assign to compressedNodes element and clear the nodestatus map. func (woc *wfOperationCtx) checkAndCompress() error { - if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) { - + if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { + start := time.Now() nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) } buff := string(nodeContent) woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) - + fmt.Println("checkAndCompress: %s", time.Since(start)) } - if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize { + + if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { + woc.isWFFailed = true return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) } return nil @@ -1698,3 +1721,29 @@ func (woc *wfOperationCtx) checkAndDecompress() error { } return nil } + +// checkAndEstimate will check and estimate the workflow size with current nodestatus +func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { + if nodeID == "" { + return nil + } + + if woc.isWFFailed { + return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)) + } + + if woc.wf.Status.CompressedNodes != "" { + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { + content, err := json.Marshal(node) + if err != nil { + return errors.InternalWrapError(err) + } + nodeSize := len(file.CompressEncodeString(string(content))) + if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize { + return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)) + } + woc.unSavedNodeStatusSize += nodeSize + } + } + return nil +} From e7ac42ad6996b40bda9408f05535d56c562049de Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 14:35:50 -0700 Subject: [PATCH 2/6] fixed variable rename --- workflow/controller/operator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b04c045956d6..f9cad161aa00 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -137,7 +137,7 @@ func (woc *wfOperationCtx) operate() { woc.log.Infof("Processing workflow") // Initialize Workflow failed status - woc.wfFailed = false + woc.isWFFailed = false // Perform one-time workflow validation if woc.wf.Status.Phase == "" { @@ -526,7 +526,7 @@ func (woc *wfOperationCtx) podReconciliation() error { woc.markNodeErrorClearOuput(nodeNameForPod, err) err = woc.checkAndCompress() if err != nil { - woc.wfFailed = true + woc.isWFFailed = true } } <-parallelPodNum From d956bfcd3300ce3a3581e93b2c47ece2360624f7 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 17:02:17 -0700 Subject: [PATCH 3/6] fixed gofmt --- workflow/controller/operator.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f9cad161aa00..aa5eb64b983b 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1686,14 +1686,12 @@ func (woc *wfOperationCtx) getSize() int { func (woc *wfOperationCtx) checkAndCompress() error { if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { - start := time.Now() nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) } buff := string(nodeContent) woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) - fmt.Println("checkAndCompress: %s", time.Since(start)) } if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { From be9b3b5037f86043754cbee8433980ec4f3b3135 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 5 Apr 2019 13:47:42 -0700 Subject: [PATCH 4/6] fixed feedbacks --- workflow/controller/operator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index aa5eb64b983b..64d80cdcf755 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1727,7 +1727,7 @@ func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { } if woc.isWFFailed { - return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)) + return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize) } if woc.wf.Status.CompressedNodes != "" { @@ -1738,7 +1738,7 @@ func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { } nodeSize := len(file.CompressEncodeString(string(content))) if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize { - return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)) + return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize) } woc.unSavedNodeStatusSize += nodeSize } From b0911802a0ae44edf9d57a2d1d87faf615515eeb Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 5 Apr 2019 14:50:43 -0700 Subject: [PATCH 5/6] Update operator.go --- workflow/controller/operator.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 64d80cdcf755..a6a779377d08 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -68,8 +68,8 @@ type wfOperationCtx struct { // unSavedNodeStatusSize is unsaved workflow size unSavedNodeStatusSize int - // wfFailed is workflow failed status - isWFFailed bool + // isWFCompressionFailed is workflow compression failed status + isWFCompressionFailed bool } var ( @@ -136,9 +136,6 @@ func (woc *wfOperationCtx) operate() { woc.log.Infof("Processing workflow") - // Initialize Workflow failed status - woc.isWFFailed = false - // Perform one-time workflow validation if woc.wf.Status.Phase == "" { woc.markWorkflowRunning() @@ -526,7 +523,7 @@ func (woc *wfOperationCtx) podReconciliation() error { woc.markNodeErrorClearOuput(nodeNameForPod, err) err = woc.checkAndCompress() if err != nil { - woc.isWFFailed = true + woc.isWFCompressionFailed = true } } <-parallelPodNum @@ -1685,7 +1682,7 @@ func (woc *wfOperationCtx) getSize() int { // The compressed content will be assign to compressedNodes element and clear the nodestatus map. func (woc *wfOperationCtx) checkAndCompress() error { - if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { + if woc.isWFCompressionFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) @@ -1694,8 +1691,8 @@ func (woc *wfOperationCtx) checkAndCompress() error { woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) } - if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { - woc.isWFFailed = true + if woc.isWFCompressionFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { + woc.isWFCompressionFailed = true return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) } return nil @@ -1726,7 +1723,7 @@ func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { return nil } - if woc.isWFFailed { + if woc.isWFCompressionFailed { return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize) } From 9cdb1e120a476fe7a7999e71f2ac34367f6b3114 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 5 Apr 2019 15:11:37 -0700 Subject: [PATCH 6/6] Update operator.go --- workflow/controller/operator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index a6a779377d08..acc4b335bb15 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1682,7 +1682,7 @@ func (woc *wfOperationCtx) getSize() int { // The compressed content will be assign to compressedNodes element and clear the nodestatus map. func (woc *wfOperationCtx) checkAndCompress() error { - if woc.isWFCompressionFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { + if !woc.isWFCompressionFailed && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err)