Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake committed Mar 2, 2022
1 parent ce67b5d commit 6129061
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions pkg/drain/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,11 @@ func (n *NodeGroupDrainer) Drain() error {
logger.Info("starting parallel draining, max in-flight of %d", parallelLimit)
// loop until all nodes are drained to handle accidental scale-up
// or any other changes in the ASG
waitForAllRoutinesToFinish := func() {
if err := sem.Acquire(context.TODO(), parallelLimit); err != nil {
logger.Critical("failed to claim sem: %w", err)
}
}

for {
select {
case <-ctx.Done():
waitForAllRoutinesToFinish()
//need to use a different context
waitForAllRoutinesToFinish(context.TODO(), sem, parallelLimit)
return fmt.Errorf("timed out (after %s) waiting for nodegroup %q to be drained", n.waitTimeout, n.ng.NameString())
default:
nodes, err := n.clientSet.CoreV1().Nodes().List(context.TODO(), listOptions)
Expand All @@ -141,7 +136,7 @@ func (n *NodeGroupDrainer) Drain() error {
}

if newPendingNodes.Len() == 0 {
waitForAllRoutinesToFinish()
waitForAllRoutinesToFinish(ctx, sem, parallelLimit)
logger.Success("drained all nodes: %v", mapToList(drainedNodes.Items()))
return nil // no new nodes were seen
}
Expand Down Expand Up @@ -183,6 +178,12 @@ func (n *NodeGroupDrainer) Drain() error {
}
}

func waitForAllRoutinesToFinish(ctx context.Context, sem *semaphore.Weighted, size int64) {
if err := sem.Acquire(ctx, size); err != nil {
logger.Critical("failed to claim sem: %w", err)
}
}

func mapToList(m map[string]interface{}) []string {
list := []string{}
for key := range m {
Expand Down

0 comments on commit 6129061

Please sign in to comment.