diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 41a5dee8d617..4f18ce9ace84 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -50,6 +50,7 @@ most users. Environment variables may be removed at any time. | `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. | | `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. | | `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. | +| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore | CLI parameters of the `argo-server` and `workflow-controller` can be specified as environment variables with the `ARGO_` prefix. For example: diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 8a4758bd5888..1902c0250ceb 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -140,7 +140,18 @@ const ( workflowTaskSetResyncPeriod = 20 * time.Minute ) -var cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0) +var ( + cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0) + + // semaphoreNotifyDelay is a slight delay when notifying/enqueueing workflows to the workqueue + // that are waiting on a semaphore. This value is passed to AddAfter(). We delay adding the next + // workflow because if we add immediately with AddRateLimited(), the next workflow will likely + // be reconciled at a point in time before we have finished the current workflow reconciliation + // as well as incrementing the semaphore counter availability, and so the next workflow will + // believe it cannot run. By delaying for 1s, we would have finished the semaphore counter + // updates, and the next workflow will see the updated availability. + semaphoreNotifyDelay = env.LookupEnvDurationOr("SEMAPHORE_NOTIFY_DELAY", time.Second) +) func init() { if cacheGCPeriod != 0 { @@ -333,7 +344,7 @@ func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context) } nextWorkflow := func(key string) { - wfc.wfQueue.AddRateLimited(key) + wfc.wfQueue.AddAfter(key, semaphoreNotifyDelay) } isWFDeleted := func(key string) bool { diff --git a/workflow/sync/semaphore.go b/workflow/sync/semaphore.go index f842c04d6f70..63be7239ddeb 100644 --- a/workflow/sync/semaphore.go +++ b/workflow/sync/semaphore.go @@ -97,26 +97,38 @@ func (s *PrioritySemaphore) release(key string) bool { availableLocks := s.limit - len(s.lockHolder) s.log.Infof("Lock has been released by %s. Available locks: %d", key, availableLocks) if s.pending.Len() > 0 { - triggerCount := availableLocks - if s.pending.Len() < triggerCount { - triggerCount = s.pending.Len() - } - for idx := 0; idx < triggerCount; idx++ { - item := s.pending.items[idx] - keyStr := fmt.Sprint(item.key) - items := strings.Split(keyStr, "/") - workflowKey := keyStr - if len(items) == 3 { - workflowKey = fmt.Sprintf("%s/%s", items[0], items[1]) - } - s.log.Debugf("Enqueue the workflow %s", workflowKey) - s.nextWorkflow(workflowKey) - } + s.notifyWaiters() } } return true } +// notifyWaiters enqueues the next N workflows who are waiting for the semaphore to the workqueue, +// where N is the availability of the semaphore. If semaphore is out of capacity, this does nothing. +func (s *PrioritySemaphore) notifyWaiters() { + triggerCount := s.limit - len(s.lockHolder) + if s.pending.Len() < triggerCount { + triggerCount = s.pending.Len() + } + for idx := 0; idx < triggerCount; idx++ { + item := s.pending.items[idx] + wfKey := workflowKey(item) + s.log.Debugf("Enqueue the workflow %s", wfKey) + s.nextWorkflow(wfKey) + } +} + +// workflowKey formulates the proper workqueue key given a semaphore queue item +func workflowKey(i *item) string { + parts := strings.Split(i.key, "/") + if len(parts) == 3 { + // the item is template semaphore (namespace/workflow-name/node-id) and so key must be + // truncated to just: namespace/workflow-name + return fmt.Sprintf("%s/%s", parts[0], parts[1]) + } + return i.key +} + // addToQueue adds the holderkey into priority queue that maintains the priority order to acquire the lock. func (s *PrioritySemaphore) addToQueue(holderKey string, priority int32, creationTime time.Time) { s.lock.Lock() @@ -168,18 +180,17 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) { } var nextKey string - waitingMsg := fmt.Sprintf("Waiting for %s lock. Lock status: %d/%d ", s.name, s.limit-len(s.lockHolder), s.limit) + waitingMsg := fmt.Sprintf("Waiting for %s lock. Lock status: %d/%d", s.name, s.limit-len(s.lockHolder), s.limit) // Check whether requested holdkey is in front of priority queue. // If it is in front position, it will allow to acquire lock. // If it is not a front key, it needs to wait for its turn. if s.pending.Len() > 0 { item := s.pending.peek() - nextKey = fmt.Sprintf("%v", item.key) - if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, nextKey) { + if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, item.key) { // Enqueue the front workflow if lock is available if len(s.lockHolder) < s.limit { - s.nextWorkflow(nextKey) + s.nextWorkflow(workflowKey(item)) } return false, waitingMsg } @@ -187,7 +198,8 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) { if s.acquire(holderKey) { s.pending.pop() - s.log.Infof("%s acquired by %s ", s.name, nextKey) + s.log.Infof("%s acquired by %s. Lock availability: %d/%d", s.name, holderKey, s.limit-len(s.lockHolder), s.limit) + s.notifyWaiters() return true, "" } s.log.Debugf("Current semaphore Holders. %v", s.lockHolder) diff --git a/workflow/sync/semaphore_test.go b/workflow/sync/semaphore_test.go index 18bfb2bd5932..75a0a658a815 100644 --- a/workflow/sync/semaphore_test.go +++ b/workflow/sync/semaphore_test.go @@ -2,6 +2,7 @@ package sync import ( "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -19,3 +20,87 @@ func TestIsSameWorkflowNodeKeys(t *testing.T) { assert.False(t, isSameWorkflowNodeKeys(wfkey1, wfkey2)) assert.True(t, isSameWorkflowNodeKeys(nodeWf2key1, nodeWf2key2)) } + +func TestTryAcquire(t *testing.T) { + nextWorkflow := func(key string) { + } + + s := NewSemaphore("foo", 2, nextWorkflow, "semaphore") + now := time.Now() + s.addToQueue("default/wf-01", 0, now) + s.addToQueue("default/wf-02", 0, now.Add(time.Second)) + s.addToQueue("default/wf-03", 0, now.Add(2*time.Second)) + s.addToQueue("default/wf-04", 0, now.Add(3*time.Second)) + + // verify only the first in line is allowed to acquired the semaphore + var acquired bool + acquired, _ = s.tryAcquire("default/wf-04") + assert.False(t, acquired) + acquired, _ = s.tryAcquire("default/wf-03") + assert.False(t, acquired) + acquired, _ = s.tryAcquire("default/wf-02") + assert.False(t, acquired) + acquired, _ = s.tryAcquire("default/wf-01") + assert.True(t, acquired) + // now that wf-01 obtained it, wf-02 can + acquired, _ = s.tryAcquire("default/wf-02") + assert.True(t, acquired) + acquired, _ = s.tryAcquire("default/wf-03") + assert.False(t, acquired) + acquired, _ = s.tryAcquire("default/wf-04") + assert.False(t, acquired) +} + +// TestNotifyWaiters ensures we notify the correct waiters after acquiring and releasing a semaphore +func TestNotifyWaitersAcquire(t *testing.T) { + notified := make(map[string]bool) + nextWorkflow := func(key string) { + notified[key] = true + } + + s := NewSemaphore("foo", 3, nextWorkflow, "semaphore") + now := time.Now() + s.addToQueue("default/wf-04", 0, now.Add(3*time.Second)) + s.addToQueue("default/wf-02", 0, now.Add(time.Second)) + s.addToQueue("default/wf-01", 0, now) + s.addToQueue("default/wf-05", 0, now.Add(4*time.Second)) + s.addToQueue("default/wf-03", 0, now.Add(2*time.Second)) + + acquired, _ := s.tryAcquire("default/wf-01") + assert.True(t, acquired) + + assert.Len(t, notified, 2) + assert.True(t, notified["default/wf-02"]) + assert.True(t, notified["default/wf-03"]) + assert.False(t, notified["default/wf-04"]) + assert.False(t, notified["default/wf-05"]) + + notified = make(map[string]bool) + released := s.release("default/wf-01") + assert.True(t, released) + + assert.Len(t, notified, 3) + assert.True(t, notified["default/wf-02"]) + assert.True(t, notified["default/wf-03"]) + assert.True(t, notified["default/wf-04"]) + assert.False(t, notified["default/wf-05"]) +} + +// TestNotifyWorkflowFromTemplateSemaphore verifies we enqueue a proper workflow key when using a semaphore template +func TestNotifyWorkflowFromTemplateSemaphore(t *testing.T) { + notified := make(map[string]bool) + nextWorkflow := func(key string) { + notified[key] = true + } + + s := NewSemaphore("foo", 2, nextWorkflow, "semaphore") + now := time.Now() + s.addToQueue("default/wf-01/nodeid-123", 0, now) + s.addToQueue("default/wf-02/nodeid-456", 0, now.Add(time.Second)) + + acquired, _ := s.tryAcquire("default/wf-01/nodeid-123") + assert.True(t, acquired) + + assert.Len(t, notified, 1) + assert.True(t, notified["default/wf-02"]) +}