Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CheckandEstimate implementation to optimize podReconciliation #1308

Merged
merged 6 commits into from
Apr 5, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ type wfOperationCtx struct {
// workflowDeadline is the deadline which the workflow is expected to complete before we
// terminate the workflow.
workflowDeadline *time.Time

// currentWFSize is current Workflow size
currentWFSize int

// unSavedNodeStatusSize is unsaved workflow size
unSavedNodeStatusSize int

// isWFCompressionFailed is workflow compression failed status
isWFCompressionFailed bool
}

var (
Expand Down Expand Up @@ -124,7 +133,9 @@ func (woc *wfOperationCtx) operate() {
woc.log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
}()

woc.log.Infof("Processing workflow")

// Perform one-time workflow validation
if woc.wf.Status.Phase == "" {
woc.markWorkflowRunning()
Expand Down Expand Up @@ -453,9 +464,9 @@ func (woc *wfOperationCtx) podReconciliation() error {
seenPodLock := &sync.Mutex{}
wfNodesLock := &sync.RWMutex{}

performAssessment := func(pod *apiv1.Pod) {
performAssessment := func(pod *apiv1.Pod) string {
if pod == nil {
return
return ""
}
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
Expand All @@ -475,16 +486,20 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node.Completed() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return
return nodeID
}
}
woc.completedPods[pod.ObjectMeta.Name] = true
}
}
return nodeID
}

parallelPodNum := make(chan string, 500)
var wg sync.WaitGroup

woc.currentWFSize = woc.getSize()

for _, pod := range podList.Items {
parallelPodNum <- pod.Name
wg.Add(1)
Expand All @@ -493,20 +508,23 @@ func (woc *wfOperationCtx) podReconciliation() error {
wfNodesLock.Lock()
origNodeStatus := *woc.wf.Status.DeepCopy()
wfNodesLock.Unlock()
performAssessment(&tmpPod)
nodeID := performAssessment(&tmpPod)
err = woc.applyExecutionControl(&tmpPod, wfNodesLock)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name)
}
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
err = woc.checkAndCompress()
err = woc.checkAndEstimate(nodeID)
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.isWFCompressionFailed = true
}
}
<-parallelPodNum
}(pod)
Expand Down Expand Up @@ -1664,17 +1682,17 @@ func (woc *wfOperationCtx) getSize() int {
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {

if !woc.isWFCompressionFailed && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) {
nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {

if woc.isWFCompressionFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) {
woc.isWFCompressionFailed = true
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
Expand All @@ -1698,3 +1716,29 @@ func (woc *wfOperationCtx) checkAndDecompress() error {
}
return nil
}

// checkAndEstimate will check and estimate the workflow size with current nodestatus
func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error {
if nodeID == "" {
return nil
}

if woc.isWFCompressionFailed {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)
}

if woc.wf.Status.CompressedNodes != "" {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
content, err := json.Marshal(node)
if err != nil {
return errors.InternalWrapError(err)
}
nodeSize := len(file.CompressEncodeString(string(content)))
if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)
}
woc.unSavedNodeStatusSize += nodeSize
}
}
return nil
}