Skip to content

Commit

Permalink
fix: Improve semaphore concurrency performance (#9666)
Browse files Browse the repository at this point in the history
* fix: improve semaphore concurrency performance

Signed-off-by: Jesse Suen <jesse@akuity.io>

* fix: template semaphore not enqueued properly. notify additional waiters

Signed-off-by: Jesse Suen <jesse@akuity.io>

Signed-off-by: Jesse Suen <jesse@akuity.io>
  • Loading branch information
jessesuen authored Sep 26, 2022
1 parent ec25899 commit b96d446
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ most users. Environment variables may be removed at any time.
| `TRANSIENT_ERROR_PATTERN` | `string` | `""` | The regular expression that represents additional patterns for transient errors. |
| `WF_DEL_PROPAGATION_POLICY` | `string` | `""` | The deletion propagation policy for workflows. |
| `WORKFLOW_GC_PERIOD` | `time.Duration` | `5m` | The periodicity for GC of workflows. |
| `SEMAPHORE_NOTIFY_DELAY` | `time.Duration` | `1s` | Tuning Delay when notifying semaphore waiters about availability in the semaphore |

CLI parameters of the `argo-server` and `workflow-controller` can be specified as environment variables with the `ARGO_`
prefix. For example:
Expand Down
15 changes: 13 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,18 @@ const (
workflowTaskSetResyncPeriod = 20 * time.Minute
)

var cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0)
var (
cacheGCPeriod = env.LookupEnvDurationOr("CACHE_GC_PERIOD", 0)

// semaphoreNotifyDelay is a slight delay when notifying/enqueueing workflows to the workqueue
// that are waiting on a semaphore. This value is passed to AddAfter(). We delay adding the next
// workflow because if we add immediately with AddRateLimited(), the next workflow will likely
// be reconciled at a point in time before we have finished the current workflow reconciliation
// as well as incrementing the semaphore counter availability, and so the next workflow will
// believe it cannot run. By delaying for 1s, we would have finished the semaphore counter
// updates, and the next workflow will see the updated availability.
semaphoreNotifyDelay = env.LookupEnvDurationOr("SEMAPHORE_NOTIFY_DELAY", time.Second)
)

func init() {
if cacheGCPeriod != 0 {
Expand Down Expand Up @@ -333,7 +344,7 @@ func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context)
}

nextWorkflow := func(key string) {
wfc.wfQueue.AddRateLimited(key)
wfc.wfQueue.AddAfter(key, semaphoreNotifyDelay)
}

isWFDeleted := func(key string) bool {
Expand Down
52 changes: 32 additions & 20 deletions workflow/sync/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,38 @@ func (s *PrioritySemaphore) release(key string) bool {
availableLocks := s.limit - len(s.lockHolder)
s.log.Infof("Lock has been released by %s. Available locks: %d", key, availableLocks)
if s.pending.Len() > 0 {
triggerCount := availableLocks
if s.pending.Len() < triggerCount {
triggerCount = s.pending.Len()
}
for idx := 0; idx < triggerCount; idx++ {
item := s.pending.items[idx]
keyStr := fmt.Sprint(item.key)
items := strings.Split(keyStr, "/")
workflowKey := keyStr
if len(items) == 3 {
workflowKey = fmt.Sprintf("%s/%s", items[0], items[1])
}
s.log.Debugf("Enqueue the workflow %s", workflowKey)
s.nextWorkflow(workflowKey)
}
s.notifyWaiters()
}
}
return true
}

// notifyWaiters enqueues the next N workflows who are waiting for the semaphore to the workqueue,
// where N is the availability of the semaphore. If semaphore is out of capacity, this does nothing.
func (s *PrioritySemaphore) notifyWaiters() {
triggerCount := s.limit - len(s.lockHolder)
if s.pending.Len() < triggerCount {
triggerCount = s.pending.Len()
}
for idx := 0; idx < triggerCount; idx++ {
item := s.pending.items[idx]
wfKey := workflowKey(item)
s.log.Debugf("Enqueue the workflow %s", wfKey)
s.nextWorkflow(wfKey)
}
}

// workflowKey formulates the proper workqueue key given a semaphore queue item
func workflowKey(i *item) string {
parts := strings.Split(i.key, "/")
if len(parts) == 3 {
// the item is template semaphore (namespace/workflow-name/node-id) and so key must be
// truncated to just: namespace/workflow-name
return fmt.Sprintf("%s/%s", parts[0], parts[1])
}
return i.key
}

// addToQueue adds the holderkey into priority queue that maintains the priority order to acquire the lock.
func (s *PrioritySemaphore) addToQueue(holderKey string, priority int32, creationTime time.Time) {
s.lock.Lock()
Expand Down Expand Up @@ -168,26 +180,26 @@ func (s *PrioritySemaphore) tryAcquire(holderKey string) (bool, string) {
}
var nextKey string

waitingMsg := fmt.Sprintf("Waiting for %s lock. Lock status: %d/%d ", s.name, s.limit-len(s.lockHolder), s.limit)
waitingMsg := fmt.Sprintf("Waiting for %s lock. Lock status: %d/%d", s.name, s.limit-len(s.lockHolder), s.limit)

// Check whether requested holdkey is in front of priority queue.
// If it is in front position, it will allow to acquire lock.
// If it is not a front key, it needs to wait for its turn.
if s.pending.Len() > 0 {
item := s.pending.peek()
nextKey = fmt.Sprintf("%v", item.key)
if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, nextKey) {
if holderKey != nextKey && !isSameWorkflowNodeKeys(holderKey, item.key) {
// Enqueue the front workflow if lock is available
if len(s.lockHolder) < s.limit {
s.nextWorkflow(nextKey)
s.nextWorkflow(workflowKey(item))
}
return false, waitingMsg
}
}

if s.acquire(holderKey) {
s.pending.pop()
s.log.Infof("%s acquired by %s ", s.name, nextKey)
s.log.Infof("%s acquired by %s. Lock availability: %d/%d", s.name, holderKey, s.limit-len(s.lockHolder), s.limit)
s.notifyWaiters()
return true, ""
}
s.log.Debugf("Current semaphore Holders. %v", s.lockHolder)
Expand Down
85 changes: 85 additions & 0 deletions workflow/sync/semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -19,3 +20,87 @@ func TestIsSameWorkflowNodeKeys(t *testing.T) {
assert.False(t, isSameWorkflowNodeKeys(wfkey1, wfkey2))
assert.True(t, isSameWorkflowNodeKeys(nodeWf2key1, nodeWf2key2))
}

func TestTryAcquire(t *testing.T) {
nextWorkflow := func(key string) {
}

s := NewSemaphore("foo", 2, nextWorkflow, "semaphore")
now := time.Now()
s.addToQueue("default/wf-01", 0, now)
s.addToQueue("default/wf-02", 0, now.Add(time.Second))
s.addToQueue("default/wf-03", 0, now.Add(2*time.Second))
s.addToQueue("default/wf-04", 0, now.Add(3*time.Second))

// verify only the first in line is allowed to acquired the semaphore
var acquired bool
acquired, _ = s.tryAcquire("default/wf-04")
assert.False(t, acquired)
acquired, _ = s.tryAcquire("default/wf-03")
assert.False(t, acquired)
acquired, _ = s.tryAcquire("default/wf-02")
assert.False(t, acquired)
acquired, _ = s.tryAcquire("default/wf-01")
assert.True(t, acquired)
// now that wf-01 obtained it, wf-02 can
acquired, _ = s.tryAcquire("default/wf-02")
assert.True(t, acquired)
acquired, _ = s.tryAcquire("default/wf-03")
assert.False(t, acquired)
acquired, _ = s.tryAcquire("default/wf-04")
assert.False(t, acquired)
}

// TestNotifyWaiters ensures we notify the correct waiters after acquiring and releasing a semaphore
func TestNotifyWaitersAcquire(t *testing.T) {
notified := make(map[string]bool)
nextWorkflow := func(key string) {
notified[key] = true
}

s := NewSemaphore("foo", 3, nextWorkflow, "semaphore")
now := time.Now()
s.addToQueue("default/wf-04", 0, now.Add(3*time.Second))
s.addToQueue("default/wf-02", 0, now.Add(time.Second))
s.addToQueue("default/wf-01", 0, now)
s.addToQueue("default/wf-05", 0, now.Add(4*time.Second))
s.addToQueue("default/wf-03", 0, now.Add(2*time.Second))

acquired, _ := s.tryAcquire("default/wf-01")
assert.True(t, acquired)

assert.Len(t, notified, 2)
assert.True(t, notified["default/wf-02"])
assert.True(t, notified["default/wf-03"])
assert.False(t, notified["default/wf-04"])
assert.False(t, notified["default/wf-05"])

notified = make(map[string]bool)
released := s.release("default/wf-01")
assert.True(t, released)

assert.Len(t, notified, 3)
assert.True(t, notified["default/wf-02"])
assert.True(t, notified["default/wf-03"])
assert.True(t, notified["default/wf-04"])
assert.False(t, notified["default/wf-05"])
}

// TestNotifyWorkflowFromTemplateSemaphore verifies we enqueue a proper workflow key when using a semaphore template
func TestNotifyWorkflowFromTemplateSemaphore(t *testing.T) {
notified := make(map[string]bool)
nextWorkflow := func(key string) {
notified[key] = true
}

s := NewSemaphore("foo", 2, nextWorkflow, "semaphore")
now := time.Now()
s.addToQueue("default/wf-01/nodeid-123", 0, now)
s.addToQueue("default/wf-02/nodeid-456", 0, now.Add(time.Second))

acquired, _ := s.tryAcquire("default/wf-01/nodeid-123")
assert.True(t, acquired)

assert.Len(t, notified, 1)
assert.True(t, notified["default/wf-02"])
}

0 comments on commit b96d446

Please sign in to comment.