Skip to content

Commit

Permalink
Allow limiting the number of nodegroups created in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
cPu1 committed Jul 11, 2024
1 parent b4af6ae commit 4fcfce3
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 63 deletions.
9 changes: 5 additions & 4 deletions pkg/actions/nodegroup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type CreateOpts struct {
DryRunSettings DryRunSettings
SkipOutdatedAddonsCheck bool
ConfigFileProvided bool
Parallelism int
}

type DryRunSettings struct {
Expand Down Expand Up @@ -171,7 +172,7 @@ func (m *Manager) Create(ctx context.Context, options CreateOpts, nodegroupFilte
return cmdutils.PrintNodeGroupDryRunConfig(clusterConfigCopy, options.DryRunSettings.OutStream)
}

if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap); err != nil {
if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap, options.Parallelism); err != nil {
return err
}

Expand Down Expand Up @@ -203,7 +204,7 @@ func makeOutpostsService(clusterConfig *api.ClusterConfig, provider api.ClusterP
}
}

func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool) error {
func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool, parallelism int) error {
cfg := m.cfg
meta := cfg.Metadata

Expand Down Expand Up @@ -259,10 +260,10 @@ func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgr
}
disableAccessEntryCreation := !m.accessEntry.IsEnabled() || updateAuthConfigMap != nil
if nodeGroupTasks := m.stackManager.NewUnmanagedNodeGroupTask(ctx, cfg.NodeGroups, !awsNodeUsesIRSA, skipEgressRules,
disableAccessEntryCreation, vpcImporter); nodeGroupTasks.Len() > 0 {
disableAccessEntryCreation, vpcImporter, parallelism); nodeGroupTasks.Len() > 0 {
allNodeGroupTasks.Append(nodeGroupTasks)
}
managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter)
managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter, parallelism)
if managedTasks.Len() > 0 {
allNodeGroupTasks.Append(managedTasks)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/actions/nodegroup/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ type stackManagerDelegate struct {
ngTaskCreator nodeGroupTaskCreator
}

func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree {
func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree {
return s.ngTaskCreator.NewUnmanagedNodeGroupTask(ctx, nodeGroups, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation, vpcImporter)
}

func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer) *tasks.TaskTree {
func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer, int) *tasks.TaskTree {
return nil
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/cfn/manager/create_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/kris-nova/logger"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -25,7 +23,7 @@ import (
// NewTasksToCreateCluster defines all tasks required to create a cluster along
// with some nodegroups; see CreateAllNodeGroups for how onlyNodeGroupSubset works.
func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroups []*api.NodeGroup,
managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree {
managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, nodeGroupParallelism int, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree {
taskTree := tasks.TaskTree{Parallel: false}

taskTree.Append(&createClusterTask{
Expand All @@ -46,11 +44,11 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
IsSubTask: true,
}
disableAccessEntryCreation := accessConfig.AuthenticationMode == ekstypes.AuthenticationModeConfigMap
if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter); unmanagedNodeGroupTasks.Len() > 0 {
if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter, nodeGroupParallelism); unmanagedNodeGroupTasks.Len() > 0 {
unmanagedNodeGroupTasks.IsSubTask = true
nodeGroupTasks.Append(unmanagedNodeGroupTasks)
}
if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter); managedNodeGroupTasks.Len() > 0 {
if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter, nodeGroupParallelism); managedNodeGroupTasks.Len() > 0 {
managedNodeGroupTasks.IsSubTask = true
nodeGroupTasks.Append(managedNodeGroupTasks)
}
Expand All @@ -75,7 +73,7 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
}

// NewUnmanagedNodeGroupTask returns tasks for creating self-managed nodegroups.
func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree {
func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, parallelism int) *tasks.TaskTree {
task := &UnmanagedNodeGroupTask{
ClusterConfig: c.spec,
NodeGroups: nodeGroups,
Expand All @@ -93,12 +91,13 @@ func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGro
SkipEgressRules: skipEgressRules,
DisableAccessEntryCreation: disableAccessEntryCreation,
VPCImporter: vpcImporter,
Parallelism: parallelism,
})
}

// NewManagedNodeGroupTask defines tasks required to create managed nodegroups
func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer) *tasks.TaskTree {
taskTree := &tasks.TaskTree{Parallel: true}
func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree {
taskTree := &tasks.TaskTree{Parallel: true, Limit: nodeGroupParallelism}
for _, ng := range nodeGroups {
// Disable parallelisation if any tags propagation is done
// since nodegroup must be created to propagate tags to its ASGs.
Expand Down Expand Up @@ -162,7 +161,7 @@ func (c *StackCollection) NewTasksToCreateIAMServiceAccounts(serviceAccounts []*
objectMeta.SetAnnotations(sa.AsObjectMeta().Annotations)
objectMeta.SetLabels(sa.AsObjectMeta().Labels)
if err := kubernetes.MaybeCreateServiceAccountOrUpdateMetadata(clientSet, objectMeta); err != nil {
return errors.Wrapf(err, "failed to create service account %s/%s", objectMeta.GetNamespace(), objectMeta.GetName())
return fmt.Errorf("failed to create service account %s/%s: %w", objectMeta.GetNamespace(), objectMeta.GetName(), err)
}
return nil
},
Expand Down
58 changes: 32 additions & 26 deletions pkg/cfn/manager/fakes/fake_stack_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4fcfce3

Please sign in to comment.