Skip to content

Commit

Permalink
drain nodes in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake committed Mar 2, 2022
1 parent ffed567 commit 6d4c77c
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 39 deletions.
3 changes: 2 additions & 1 deletion pkg/actions/nodegroup/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ type DrainInput struct {
NodeDrainWaitPeriod time.Duration
Undo bool
DisableEviction bool
Parallel int
}

func (m *Manager) Drain(input *DrainInput) error {
if !input.Plan {
for _, n := range input.NodeGroups {
nodeGroupDrainer := drain.NewNodeGroupDrainer(m.clientSet, n, m.ctl.Provider.WaitTimeout(), input.MaxGracePeriod, input.NodeDrainWaitPeriod, input.Undo, input.DisableEviction)
nodeGroupDrainer := drain.NewNodeGroupDrainer(m.clientSet, n, m.ctl.Provider.WaitTimeout(), input.MaxGracePeriod, input.NodeDrainWaitPeriod, input.Undo, input.DisableEviction, input.Parallel)
if err := nodeGroupDrainer.Drain(); err != nil {
return err
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/ctl/cmdutils/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/csv"
"fmt"
"reflect"
"strconv"
"strings"

"github.com/kris-nova/logger"
Expand Down Expand Up @@ -495,8 +496,8 @@ func normalizeBaseNodeGroup(np api.NodePool, cmd *cobra.Command) {
}
}

// NewDeleteNodeGroupLoader will load config or use flags for 'eksctl delete nodegroup'
func NewDeleteNodeGroupLoader(cmd *Cmd, ng *api.NodeGroup, ngFilter *filter.NodeGroupFilter) ClusterConfigLoader {
// NewDeleteAndDrainNodeGroupLoader will load config or use flags for 'eksctl delete nodegroup'
func NewDeleteAndDrainNodeGroupLoader(cmd *Cmd, ng *api.NodeGroup, ngFilter *filter.NodeGroupFilter) ClusterConfigLoader {
l := newCommonClusterConfigLoader(cmd)

l.validateWithConfigFile = func() error {
Expand Down Expand Up @@ -524,6 +525,12 @@ func NewDeleteNodeGroupLoader(cmd *Cmd, ng *api.NodeGroup, ngFilter *filter.Node
return ErrMustBeSet("--name")
}

if flag := l.CobraCommand.Flag("parallel"); flag != nil && flag.Changed {
if val, _ := strconv.Atoi(flag.Value.String()); val > 25 || val < 1 {
return fmt.Errorf("--parallel value must be of range 1-25")
}
}

ngFilter.AppendIncludeNames(ng.Name)

l.Plan = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/ctl/delete/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func deleteNodeGroupWithRunFunc(cmd *cmdutils.Cmd, runFunc func(cmd *cmdutils.Cm
func doDeleteNodeGroup(cmd *cmdutils.Cmd, ng *api.NodeGroup, updateAuthConfigMap, deleteNodeGroupDrain, onlyMissing bool, maxGracePeriod time.Duration, disableEviction bool) error {
ngFilter := filter.NewNodeGroupFilter()

if err := cmdutils.NewDeleteNodeGroupLoader(cmd, ng, ngFilter).Load(); err != nil {
if err := cmdutils.NewDeleteAndDrainNodeGroupLoader(cmd, ng, ngFilter).Load(); err != nil {
return err
}

Expand Down
26 changes: 16 additions & 10 deletions pkg/ctl/drain/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,30 @@ import (
)

func drainNodeGroupCmd(cmd *cmdutils.Cmd) {
drainNodeGroupWithRunFunc(cmd, func(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool) error {
return doDrainNodeGroup(cmd, ng, undo, onlyMissing, maxGracePeriod, nodeDrainWaitPeriod, disableEviction)
drainNodeGroupWithRunFunc(cmd, func(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool, parallel int) error {
return doDrainNodeGroup(cmd, ng, undo, onlyMissing, maxGracePeriod, nodeDrainWaitPeriod, disableEviction, parallel)
})
}

func drainNodeGroupWithRunFunc(cmd *cmdutils.Cmd, runFunc func(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool) error) {
func drainNodeGroupWithRunFunc(cmd *cmdutils.Cmd, runFunc func(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool, parallel int) error) {
cfg := api.NewClusterConfig()
ng := api.NewNodeGroup()
cmd.ClusterConfig = cfg

var undo, onlyMissing bool
var maxGracePeriod time.Duration
var nodeDrainWaitPeriod time.Duration
var disableEviction bool
var (
undo bool
onlyMissing bool
disableEviction bool
parallel int
maxGracePeriod time.Duration
nodeDrainWaitPeriod time.Duration
)

cmd.SetDescription("nodegroup", "Cordon and drain a nodegroup", "", "ng")

cmd.CobraCommand.RunE = func(_ *cobra.Command, args []string) error {
cmd.NameArg = cmdutils.GetNameArg(args)
return runFunc(cmd, ng, undo, onlyMissing, maxGracePeriod, nodeDrainWaitPeriod, disableEviction)
return runFunc(cmd, ng, undo, onlyMissing, maxGracePeriod, nodeDrainWaitPeriod, disableEviction, parallel)
}

cmd.FlagSetGroup.InFlagSet("General", func(fs *pflag.FlagSet) {
Expand All @@ -52,15 +56,16 @@ func drainNodeGroupWithRunFunc(cmd *cmdutils.Cmd, runFunc func(cmd *cmdutils.Cmd
fs.BoolVar(&disableEviction, "disable-eviction", defaultDisableEviction, "Force drain to use delete, even if eviction is supported. This will bypass checking PodDisruptionBudgets, use with caution.")
cmdutils.AddTimeoutFlag(fs, &cmd.ProviderConfig.WaitTimeout)
fs.DurationVar(&nodeDrainWaitPeriod, "node-drain-wait-period", 0, "Amount of time to wait between draining nodes in a nodegroup")
fs.IntVar(&parallel, "parallel", 5, "Number of nodes to drain in parallel. Max 25")
})

cmdutils.AddCommonFlagsForAWS(cmd.FlagSetGroup, &cmd.ProviderConfig, true)
}

func doDrainNodeGroup(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool) error {
func doDrainNodeGroup(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bool, maxGracePeriod, nodeDrainWaitPeriod time.Duration, disableEviction bool, parallel int) error {
ngFilter := filter.NewNodeGroupFilter()

if err := cmdutils.NewDeleteNodeGroupLoader(cmd, ng, ngFilter).Load(); err != nil {
if err := cmdutils.NewDeleteAndDrainNodeGroupLoader(cmd, ng, ngFilter).Load(); err != nil {
return err
}

Expand Down Expand Up @@ -126,6 +131,7 @@ func doDrainNodeGroup(cmd *cmdutils.Cmd, ng *api.NodeGroup, undo, onlyMissing bo
NodeDrainWaitPeriod: nodeDrainWaitPeriod,
Undo: undo,
DisableEviction: disableEviction,
Parallel: parallel,
}
return nodegroup.New(cfg, ctl, clientSet).Drain(drainInput)
}
63 changes: 43 additions & 20 deletions pkg/drain/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"golang.org/x/sync/semaphore"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/weaveworks/eksctl/pkg/drain/evictor"
Expand Down Expand Up @@ -40,9 +41,10 @@ type NodeGroupDrainer struct {
waitTimeout time.Duration
nodeDrainWaitPeriod time.Duration
undo bool
parallel int
}

func NewNodeGroupDrainer(clientSet kubernetes.Interface, ng eks.KubeNodeGroup, waitTimeout, maxGracePeriod, nodeDrainWaitPeriod time.Duration, undo, disableEviction bool) NodeGroupDrainer {
func NewNodeGroupDrainer(clientSet kubernetes.Interface, ng eks.KubeNodeGroup, waitTimeout, maxGracePeriod, nodeDrainWaitPeriod time.Duration, undo, disableEviction bool, parallel int) NodeGroupDrainer {
ignoreDaemonSets := []metav1.ObjectMeta{
{
Namespace: "kube-system",
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewNodeGroupDrainer(clientSet kubernetes.Interface, ng eks.KubeNodeGroup, w
waitTimeout: waitTimeout,
nodeDrainWaitPeriod: nodeDrainWaitPeriod,
undo: undo,
parallel: parallel,
}
}

Expand All @@ -102,14 +105,24 @@ func (n *NodeGroupDrainer) Drain() error {
}

drainedNodes := sets.NewString()
ctx, cancel := context.WithTimeout(context.TODO(), n.waitTimeout)
defer cancel()

parallelLimit := int64(n.parallel)
sem := semaphore.NewWeighted(parallelLimit)
logger.Info("starting parallel draining, max in-flight of %d", parallelLimit)
// loop until all nodes are drained to handle accidental scale-up
// or any other changes in the ASG
timer := time.NewTimer(n.waitTimeout)
defer timer.Stop()
waitForAllRoutinesToFinish := func() {
if err := sem.Acquire(context.TODO(), parallelLimit); err != nil {
logger.Critical("failed to claim sem: %w", err)
}
}

for {
select {
case <-timer.C:
case <-ctx.Done():
waitForAllRoutinesToFinish()
return fmt.Errorf("timed out (after %s) waiting for nodegroup %q to be drained", n.waitTimeout, n.ng.NameString())
default:
nodes, err := n.clientSet.CoreV1().Nodes().List(context.TODO(), listOptions)
Expand All @@ -127,30 +140,40 @@ func (n *NodeGroupDrainer) Drain() error {
}

if newPendingNodes.Len() == 0 {
waitForAllRoutinesToFinish()
logger.Success("drained all nodes: %v", drainedNodes.List())
return nil // no new nodes were seen
}

logger.Debug("already drained: %v", drainedNodes.List())
logger.Debug("will drain: %v", newPendingNodes.List())

for i, node := range newPendingNodes.List() {
pending, err := n.evictPods(node)
if err != nil {
logger.Warning("pod eviction error (%q) on node %s", err, node)
time.Sleep(retryDelay)
continue
}
logger.Debug("%d pods to be evicted from %s", pending, node)
if pending == 0 {
drainedNodes.Insert(node)
}

// only wait if we're not on the last node of this iteration
if n.nodeDrainWaitPeriod > 0 && i < newPendingNodes.Len()-1 {
logger.Debug("waiting for %.0f seconds before draining next node", n.nodeDrainWaitPeriod.Seconds())
time.Sleep(n.nodeDrainWaitPeriod)
i := i
node := node
if err := sem.Acquire(ctx, 1); err != nil {
logger.Critical("failed to claim sem: %w", err)
}
go func() {
defer sem.Release(1)
logger.Debug("starting drain of node %s", node)
pending, err := n.evictPods(node)
if err != nil {
logger.Warning("pod eviction error (%q) on node %s", err, node)
time.Sleep(retryDelay)
return
}

logger.Debug("%d pods to be evicted from %s", pending, node)
if pending == 0 {
drainedNodes.Insert(node)
}

// only wait if we're not on the last node of this iteration
if n.nodeDrainWaitPeriod > 0 && i < newPendingNodes.Len()-1 {
logger.Debug("waiting for %.0f seconds before draining next node", n.nodeDrainWaitPeriod.Seconds())
time.Sleep(n.nodeDrainWaitPeriod)
}
}()
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/drain/nodegroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ = Describe("Drain", func() {
})

It("does not error", func() {
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second*10, time.Second, false, false)
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second*10, time.Second, false, false, 1)
nodeGroupDrainer.SetDrainer(fakeEvictor)

err := nodeGroupDrainer.Drain()
Expand Down Expand Up @@ -117,7 +117,7 @@ var _ = Describe("Drain", func() {
})

It("times out and errors", func() {
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*2, time.Second*0, time.Second, false, false)
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*2, time.Second*0, time.Second, false, false, 1)
nodeGroupDrainer.SetDrainer(fakeEvictor)

err := nodeGroupDrainer.Drain()
Expand All @@ -131,7 +131,7 @@ var _ = Describe("Drain", func() {
})

It("errors", func() {
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second, time.Second, time.Second, false, false)
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second, time.Second, time.Second, false, false, 1)
nodeGroupDrainer.SetDrainer(fakeEvictor)

err := nodeGroupDrainer.Drain()
Expand Down Expand Up @@ -175,7 +175,7 @@ var _ = Describe("Drain", func() {
})

It("does not error", func() {
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second, time.Second, false, true)
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second, time.Second, false, true, 1)
nodeGroupDrainer.SetDrainer(fakeEvictor)

err := nodeGroupDrainer.Drain()
Expand Down Expand Up @@ -205,7 +205,7 @@ var _ = Describe("Drain", func() {
})

It("uncordons all the nodes", func() {
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second, time.Second, true, false)
nodeGroupDrainer := drain.NewNodeGroupDrainer(fakeClientSet, &mockNG, time.Second*10, time.Second, time.Second, true, false, 1)
nodeGroupDrainer.SetDrainer(fakeEvictor)

err := nodeGroupDrainer.Drain()
Expand Down
3 changes: 3 additions & 0 deletions pkg/eks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (c *Client) new(spec *api.ClusterConfig, stsClient stsiface.STSAPI) (*Clien
if err != nil {
return nil, errors.Wrap(err, "failed to create API client configuration from client config")
}

c.rawConfig = rawConfig
c.rawConfig.QPS = float32(25)
c.rawConfig.Burst = int(c.rawConfig.QPS) * 2

return c, nil
}
Expand Down

0 comments on commit 6d4c77c

Please sign in to comment.