diff --git a/.gitignore b/.gitignore index 75bdfa4ea177..5bb2fc438fdc 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ git-ask-pass.sh /go-diagrams/ /.run/ pprof +pkg/apiclient/sensor/sensor.swagger.json +pkg/apiclient/eventsource/eventsource.swagger.json diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 78437e74f827..e0923c9bde59 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1702,6 +1702,9 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false) } } + if processedTmpl.Synchronization != nil { + woc.controller.syncManager.Release(woc.wf, node.ID, processedTmpl.Synchronization) + } return retryParentNode, nil } lastChildNode := getChildNodeIndex(retryParentNode, woc.wf.Status.Nodes, -1) diff --git a/workflow/controller/operator_concurrency_test.go b/workflow/controller/operator_concurrency_test.go index c38c5896a56f..6d4e16e62712 100644 --- a/workflow/controller/operator_concurrency_test.go +++ b/workflow/controller/operator_concurrency_test.go @@ -398,3 +398,87 @@ func TestMutexInDAG(t *testing.T) { } }) } + +const RetryWfWithSemaphore = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: script-wf + namespace: default +spec: + entrypoint: step1 + retryStrategy: + limit: 10 + templates: + - name: step1 + steps: + - - name: hello1 + template: whalesay + - - name: hello2 + template: whalesay + - name: whalesay + daemon: true + synchronization: + semaphore: + configMapKeyRef: + key: template + name: my-config + container: + args: + - "hello world" + command: + - cowsay + image: "docker/whalesay:latest" +` + +func TestSynchronizationWithRetry(t *testing.T) { + assert := assert.New(t) + cancel, controller := newController() + defer cancel() + ctx := context.Background() + controller.syncManager = sync.NewLockManager(GetSyncLimitFunc(ctx, controller.kubeclientset), func(key string) { + }) + var cm v1.ConfigMap + err := yaml.Unmarshal([]byte(configMap), &cm) + assert.NoError(err) + _, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &cm, metav1.CreateOptions{}) + assert.NoError(err) + t.Run("WorkflowWithRetry", func(t *testing.T) { + wf := unmarshalWF(RetryWfWithSemaphore) + wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(err) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + for _, node := range woc.wf.Status.Nodes { + if node.Name == "hello1" { + assert.Equal(wfv1.NodePending, node.Phase) + } + } + + // Updating Pod state + makePodsPhase(ctx, woc, v1.PodSucceeded) + + // Release the lock from hello1 + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + for _, node := range woc.wf.Status.Nodes { + if node.Name == "hello1" { + assert.Equal(wfv1.NodeSucceeded, node.Phase) + } + if node.Name == "hello2" { + assert.Equal(wfv1.NodePending, node.Phase) + } + } + // Updating Pod state + makePodsPhase(ctx, woc, v1.PodSucceeded) + + // Release the lock from hello2 + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + // Nobody is waiting for the lock + assert.Empty(woc.wf.Status.Synchronization.Semaphore.Waiting) + // Nobody is holding the lock + assert.Empty(woc.wf.Status.Synchronization.Semaphore.Holding[0].Holders) + + }) +} diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index 6cd14d9ae115..620e6e553acd 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -155,6 +155,7 @@ func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Syn if syncLockHolder, ok := cm.syncLockMap[lockName.EncodeName()]; ok { syncLockHolder.release(holderKey) + syncLockHolder.removeFromQueue(holderKey) log.Debugf("%s sync lock is released by %s", lockName.EncodeName(), holderKey) lockKey := lockName.EncodeName() wf.Status.Synchronization.GetStatus(syncRef.GetType()).LockReleased(holderKey, lockKey)