Skip to content

Commit

Permalink
Add per instance mux
Browse files Browse the repository at this point in the history
Lock operations per instance name. This should avoid go routines trying
to update the same instance when operations may be slow.

Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
  • Loading branch information
gabriel-samfira committed Jun 23, 2023
1 parent a9cf512 commit e2c8c3a
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 26 deletions.
2 changes: 2 additions & 0 deletions runner/pool/enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt
}

wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}

helper := &enterprise{
cfg: cfg,
Expand All @@ -47,6 +48,7 @@ func NewEnterprisePoolManager(ctx context.Context, cfg params.Enterprise, cfgInt
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}
Expand Down
2 changes: 2 additions & 0 deletions runner/pool/organization.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf
}

wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}

helper := &organization{
cfg: cfg,
Expand All @@ -60,6 +61,7 @@ func NewOrganizationPoolManager(ctx context.Context, cfg params.Organization, cf
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}
Expand Down
120 changes: 98 additions & 22 deletions runner/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,34 @@ const (
maxCreateAttempts = 5
)

type keyMutex struct {
muxes sync.Map
}

func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}

func (k *keyMutex) Unlock(key string) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
keyMux.Unlock()
}

func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}

func (k *keyMutex) UnlockAndDelete(key string) {
k.Unlock(key)
k.Delete(key)
}

type basePoolManager struct {
ctx context.Context
controllerID string
Expand All @@ -65,9 +93,9 @@ type basePoolManager struct {
managerIsRunning bool
managerErrorReason string

mux sync.Mutex

wg *sync.WaitGroup
mux sync.Mutex
wg *sync.WaitGroup
keyMux *keyMutex
}

func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) (err error) {
Expand Down Expand Up @@ -290,6 +318,13 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
}

for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)

switch providerCommon.InstanceStatus(instance.Status) {
case providerCommon.InstancePendingCreate,
providerCommon.InstancePendingDelete:
Expand Down Expand Up @@ -342,6 +377,13 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
}

for _, instance := range dbInstances {
lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}
defer r.keyMux.Unlock(instance.Name)

pool, err := r.store.GetPoolByID(r.ctx, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
Expand Down Expand Up @@ -453,6 +495,14 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
}
poolInstanceCache[pool.ID] = poolInstances
}

lockAcquired := r.keyMux.TryLock(dbInstance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", dbInstance.Name)
continue
}
defer r.keyMux.Unlock(dbInstance.Name)

// See: https://golang.org/doc/faq#closures_and_goroutines
runner := runner
g.Go(func() error {
Expand All @@ -475,6 +525,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
if err := r.store.DeleteInstance(ctx, dbInstance.PoolID, dbInstance.Name); err != nil {
return errors.Wrap(err, "removing runner from database")
}
defer r.keyMux.UnlockAndDelete(dbInstance.Name)
return nil
}

Expand Down Expand Up @@ -853,6 +904,14 @@ func (r *basePoolManager) scaleDownOnePool(ctx context.Context, pool params.Pool

for _, instanceToDelete := range idleWorkers[:numScaleDown] {
instanceToDelete := instanceToDelete

lockAcquired := r.keyMux.TryLock(instanceToDelete.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instanceToDelete.Name)
continue
}
defer r.keyMux.Unlock(instanceToDelete.Name)

g.Go(func() error {
log.Printf("scaling down idle worker %s from pool %s\n", instanceToDelete.Name, pool.ID)
if err := r.ForceDeleteRunner(instanceToDelete); err != nil {
Expand Down Expand Up @@ -929,8 +988,16 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
if instance.CreateAttempt >= maxCreateAttempts {
continue
}

lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}

instance := instance
g.Go(func() error {
defer r.keyMux.Unlock(instance.Name)
// NOTE(gabriel-samfira): this is done in parallel. If there are many failed instances
// this has the potential to create many API requests to the target provider.
// TODO(gabriel-samfira): implement request throttling.
Expand Down Expand Up @@ -1060,22 +1127,29 @@ func (r *basePoolManager) deletePendingInstances() error {
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
g, ctx := errgroup.WithContext(r.ctx)

for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingDelete {
// not in pending_delete status. Skip.
continue
}

lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}

// Set the status to deleting before launching the goroutine that removes
// the runner from the provider (which can take a long time).
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceDeleting, nil); err != nil {
log.Printf("failed to update runner %s status", instance.Name)
r.keyMux.Unlock(instance.Name)
continue
}

instance := instance
g.Go(func() (err error) {
go func(instance params.Instance) (err error) {
defer r.keyMux.Unlock(instance.Name)
defer func(instance params.Instance) {
if err != nil {
// failed to remove from provider. Set the status back to pending_delete, which
Expand All @@ -1086,19 +1160,17 @@ func (r *basePoolManager) deletePendingInstances() error {
}
}(instance)

err = r.deleteInstanceFromProvider(ctx, instance)
err = r.deleteInstanceFromProvider(r.ctx, instance)
if err != nil {
return fmt.Errorf("failed to remove instance from provider: %w", err)
}

if deleteErr := r.store.DeleteInstance(ctx, instance.PoolID, instance.Name); deleteErr != nil {
if deleteErr := r.store.DeleteInstance(r.ctx, instance.PoolID, instance.Name); deleteErr != nil {
return fmt.Errorf("failed to delete instance from database: %w", deleteErr)
}
r.keyMux.UnlockAndDelete(instance.Name)
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to delete pending instances: %w", err)
}(instance)
}

return nil
Expand All @@ -1110,36 +1182,40 @@ func (r *basePoolManager) addPendingInstances() error {
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
g, _ := errgroup.WithContext(r.ctx)
for _, instance := range instances {
if instance.Status != providerCommon.InstancePendingCreate {
// not in pending_create status. Skip.
continue
}

lockAcquired := r.keyMux.TryLock(instance.Name)
if !lockAcquired {
log.Printf("failed to acquire lock for instance %s", instance.Name)
continue
}

// Set the instance to "creating" before launching the goroutine. This will ensure that addPendingInstances()
// won't attempt to create the runner a second time.
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceCreating, nil); err != nil {
log.Printf("failed to update runner %s status: %s", instance.Name, err)
r.keyMux.Unlock(instance.Name)
// We failed to transition the instance to Creating. This means that garm will retry to create this instance
// when the loop runs again and we end up with multiple instances.
continue
}
instance := instance
g.Go(func() error {

go func(instance params.Instance) {
defer r.keyMux.Unlock(instance.Name)
log.Printf("creating instance %s in pool %s", instance.Name, instance.PoolID)
if err := r.addInstanceToProvider(instance); err != nil {
log.Printf("failed to add instance to provider: %s", err)
errAsBytes := []byte(err.Error())
if err := r.setInstanceStatus(instance.Name, providerCommon.InstanceError, errAsBytes); err != nil {
return fmt.Errorf("failed to update runner %s status: %w", instance.Name, err)
log.Printf("failed to update runner %s status: %s", instance.Name, err)
}
return fmt.Errorf("failed to create instance in provider: %w", err)
log.Printf("failed to create instance in provider: %s", err)
}
return nil
})
}
if err := r.waitForErrorGroupOrContextCancelled(g); err != nil {
return fmt.Errorf("failed to add pending instances: %w", err)
}(instance)
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions runner/pool/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt
}

wg := &sync.WaitGroup{}
keyMuxes := &keyMutex{}

helper := &repository{
cfg: cfg,
Expand All @@ -60,6 +61,7 @@ func NewRepositoryPoolManager(ctx context.Context, cfg params.Repository, cfgInt
helper: helper,
credsDetails: cfgInternal.GithubCredentialsDetails,
wg: wg,
keyMux: keyMuxes,
}
return repo, nil
}
Expand Down
4 changes: 0 additions & 4 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,10 +636,6 @@ func (r *Runner) DispatchWorkflowJob(hookTargetType, signature string, jobData [
return errors.Wrapf(runnerErrors.ErrBadRequest, "invalid job data: %s", err)
}

asJs, _ := json.MarshalIndent(job, "", " ")
log.Printf("got workflow job: %s", string(asJs))
log.Printf("got workflow job for %s", string(jobData))

var poolManager common.PoolManager
var err error

Expand Down

0 comments on commit e2c8c3a

Please sign in to comment.