Skip to content

Commit

Permalink
fix: Update workflowtaskresult code have own reconciliation loop. (#…
Browse files Browse the repository at this point in the history
…8135)

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored Mar 18, 2022
1 parent 051c7b8 commit 41fd07a
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 53 deletions.
2 changes: 1 addition & 1 deletion test/e2e/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *ArtifactsSuite) TestMainLog() {
Then().
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
n := status.Nodes[m.Name]
if assert.NotNil(t, n) {
if assert.NotNil(t, n.Outputs) {
assert.Len(t, n.Outputs.Artifacts, 1)
}
})
Expand Down
8 changes: 5 additions & 3 deletions test/e2e/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ func (s *ProgressSuite) TestLoggedProgress() {
Workflow("@testdata/progress-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(toHaveProgress("0/100")).
WaitForWorkflow(toHaveProgress("50/100")).
WaitForWorkflow(toHaveProgress("100/100"))
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.Progress("100/100"), status.Nodes[metadata.Name].Progress)
})
}

func TestProgressSuite(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/resource_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ spec:
- name: main
resource:
action: create
setOwnerReference: true
successCondition: status.phase == Succeeded
failureCondition: status.phase == Failed
manifest: |
Expand Down Expand Up @@ -70,6 +71,7 @@ spec:
serviceAccountName: argo
resource:
action: create
setOwnerReference: true
successCondition: status.phase == Succeeded
failureCondition: status.phase == Failed
manifest: |
Expand Down
3 changes: 0 additions & 3 deletions test/e2e/testdata/progress-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ spec:
- name: progress
template: progress
- name: progress
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: argoproj/argosay:v2
command: ["/bin/sh", "-c"]
Expand Down
17 changes: 12 additions & 5 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type WorkflowController struct {
archiveLabelSelector labels.Selector
cacheFactory controllercache.Factory
wfTaskSetInformer wfextvv1alpha1.WorkflowTaskSetInformer
taskResultInformer wfextvv1alpha1.WorkflowTaskResultInformer
taskResultInformer cache.SharedIndexInformer

// progressPatchTickDuration defines how often the executor will patch pod annotations if an updated progress is found.
// Default is 1m and can be configured using the env var ARGO_PROGRESS_PATCH_TICK_DURATION.
Expand Down Expand Up @@ -258,7 +258,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.podInformer.Run(ctx.Done())
go wfc.configMapInformer.Run(ctx.Done())
go wfc.wfTaskSetInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Run(ctx.Done())

// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(
Expand All @@ -268,7 +268,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.podInformer.HasSynced,
wfc.configMapInformer.HasSynced,
wfc.wfTaskSetInformer.Informer().HasSynced,
wfc.taskResultInformer.Informer().HasSynced,
wfc.taskResultInformer.HasSynced,
) {
log.Fatal("Timed out waiting for caches to sync")
}
Expand Down Expand Up @@ -931,13 +931,20 @@ func (wfc *WorkflowController) archiveWorkflowAux(ctx context.Context, obj inter
return nil
}

var incompleteReq, _ = labels.NewRequirement(common.LabelKeyCompleted, selection.Equals, []string{"false"})
var workflowReq, _ = labels.NewRequirement(common.LabelKeyWorkflow, selection.Exists, nil)

func (wfc *WorkflowController) instanceIDReq() labels.Requirement {
return util.InstanceIDRequirement(wfc.Config.InstanceID)
}

func (wfc *WorkflowController) newWorkflowPodWatch(ctx context.Context) *cache.ListWatch {
c := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace())
// completed=false
incompleteReq, _ := labels.NewRequirement(common.LabelKeyCompleted, selection.Equals, []string{"false"})
labelSelector := labels.NewSelector().
Add(*workflowReq).
Add(*incompleteReq).
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
Add(wfc.instanceIDReq())

listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelSelector.String()
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
{
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListOptions, indexers)
wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets()
wfc.taskResultInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskResults()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer = wfc.newPodInformer(ctx)
Expand All @@ -232,7 +232,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
go wfc.wfTaskSetInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Run(ctx.Done())
wfc.cwftmplInformer = informerFactory.Argoproj().V1alpha1().ClusterWorkflowTemplates()
go wfc.cwftmplInformer.Informer().Run(ctx.Done())
// wfc.waitForCacheSync() takes minimum 100ms, we can be faster
Expand All @@ -242,7 +242,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.podInformer,
wfc.cwftmplInformer.Informer(),
wfc.wfTaskSetInformer.Informer(),
wfc.taskResultInformer.Informer(),
wfc.taskResultInformer,
} {
for !c.HasSynced() {
time.Sleep(5 * time.Millisecond)
Expand Down
14 changes: 1 addition & 13 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.wf.Status.EstimatedDuration = woc.estimateWorkflowDuration()
} else {
woc.workflowDeadline = woc.getWorkflowDeadline()
woc.taskResultReconciliation()
err := woc.podReconciliation(ctx)
if err == nil {
woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
Expand Down Expand Up @@ -1255,19 +1256,6 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
}
}

obj, _, _ := woc.controller.taskResultInformer.Informer().GetStore().GetByKey(woc.wf.Namespace + "/" + old.ID)
if result, ok := obj.(*wfv1.WorkflowTaskResult); ok {
if result.Outputs.HasOutputs() {
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
result.Outputs.DeepCopyInto(new.Outputs) // preserve any existing values
}
if result.Progress.IsValid() {
new.Progress = result.Progress
}
}

// We capture the exit-code after we look for the task-result.
// All other outputs are set by the executor, only the exit-code is set by the controller.
// By waiting, we avoid breaking the race-condition check.
Expand Down
78 changes: 53 additions & 25 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,65 @@
package controller

import (
"reflect"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/util"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)

func (wfc *WorkflowController) newWorkflowTaskResultInformer() wfextvv1alpha1.WorkflowTaskResultInformer {
informer := externalversions.NewSharedInformerFactoryWithOptions(
func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndexInformer {
labelSelector := labels.NewSelector().
Add(*workflowReq).
Add(wfc.instanceIDReq()).
String()
log.WithField("labelSelector", labelSelector).
Info("Watching task results")
return wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer(
wfc.wfclientset,
workflowTaskSetResyncPeriod,
externalversions.WithNamespace(wfc.GetManagedNamespace()),
externalversions.WithTweakListOptions(func(x *metav1.ListOptions) {
r := util.InstanceIDRequirement(wfc.Config.InstanceID)
x.LabelSelector = r.String()
})).Argoproj().V1alpha1().WorkflowTaskResults()
informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
UpdateFunc: func(_, new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
})
return informer
wfc.GetManagedNamespace(),
20*time.Minute,
cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
},
func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
},
)
}

func (woc *wfOperationCtx) taskResultReconciliation() {
objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")
for _, obj := range objs {
result := obj.(*wfv1.WorkflowTaskResult)
nodeID := result.Name
old := woc.wf.Status.Nodes[nodeID]
new := old.DeepCopy()
if result.Outputs.HasOutputs() {
if new.Outputs == nil {
new.Outputs = &wfv1.Outputs{}
}
result.Outputs.DeepCopyInto(new.Outputs) // preserve any existing values
if old.Outputs != nil && new.Outputs.ExitCode == nil { // prevent overwriting of ExitCode
new.Outputs.ExitCode = old.Outputs.ExitCode
}
}
if result.Progress.IsValid() {
new.Progress = result.Progress
}
if !reflect.DeepEqual(&old, new) {
woc.log.
WithField("nodeID", nodeID).
Info("task-result changed")
woc.wf.Status.Nodes[nodeID] = *new
woc.updated = true
}
}
}

0 comments on commit 41fd07a

Please sign in to comment.