From 4fcfce38a1f43031a4bb6c09823a10d9e2aa2426 Mon Sep 17 00:00:00 2001 From: cPu1 Date: Thu, 11 Jul 2024 18:54:45 +0530 Subject: [PATCH] Allow limiting the number of nodegroups created in parallel --- pkg/actions/nodegroup/create.go | 9 ++-- pkg/actions/nodegroup/create_test.go | 4 +- pkg/cfn/manager/create_tasks.go | 17 +++--- pkg/cfn/manager/fakes/fake_stack_manager.go | 58 ++++++++++++--------- pkg/cfn/manager/interface.go | 6 +-- pkg/cfn/manager/nodegroup.go | 3 +- pkg/cfn/manager/tasks_test.go | 26 ++++----- pkg/ctl/cmdutils/create_cluster.go | 1 + pkg/ctl/cmdutils/nodegroup_flags.go | 1 + pkg/ctl/create/cluster.go | 5 +- pkg/ctl/create/nodegroup.go | 1 + pkg/utils/tasks/tasks.go | 32 +++++++++++- 12 files changed, 100 insertions(+), 63 deletions(-) diff --git a/pkg/actions/nodegroup/create.go b/pkg/actions/nodegroup/create.go index d9e0a455ce..1b386791fb 100644 --- a/pkg/actions/nodegroup/create.go +++ b/pkg/actions/nodegroup/create.go @@ -38,6 +38,7 @@ type CreateOpts struct { DryRunSettings DryRunSettings SkipOutdatedAddonsCheck bool ConfigFileProvided bool + Parallelism int } type DryRunSettings struct { @@ -171,7 +172,7 @@ func (m *Manager) Create(ctx context.Context, options CreateOpts, nodegroupFilte return cmdutils.PrintNodeGroupDryRunConfig(clusterConfigCopy, options.DryRunSettings.OutStream) } - if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap); err != nil { + if err := m.nodeCreationTasks(ctx, isOwnedCluster, skipEgressRules, options.UpdateAuthConfigMap, options.Parallelism); err != nil { return err } @@ -203,7 +204,7 @@ func makeOutpostsService(clusterConfig *api.ClusterConfig, provider api.ClusterP } } -func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool) error { +func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgressRules bool, updateAuthConfigMap *bool, parallelism int) error { cfg := m.cfg meta := cfg.Metadata @@ -259,10 +260,10 @@ func (m *Manager) nodeCreationTasks(ctx context.Context, isOwnedCluster, skipEgr } disableAccessEntryCreation := !m.accessEntry.IsEnabled() || updateAuthConfigMap != nil if nodeGroupTasks := m.stackManager.NewUnmanagedNodeGroupTask(ctx, cfg.NodeGroups, !awsNodeUsesIRSA, skipEgressRules, - disableAccessEntryCreation, vpcImporter); nodeGroupTasks.Len() > 0 { + disableAccessEntryCreation, vpcImporter, parallelism); nodeGroupTasks.Len() > 0 { allNodeGroupTasks.Append(nodeGroupTasks) } - managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter) + managedTasks := m.stackManager.NewManagedNodeGroupTask(ctx, cfg.ManagedNodeGroups, !awsNodeUsesIRSA, vpcImporter, parallelism) if managedTasks.Len() > 0 { allNodeGroupTasks.Append(managedTasks) } diff --git a/pkg/actions/nodegroup/create_test.go b/pkg/actions/nodegroup/create_test.go index 482b59f00d..63452fa477 100644 --- a/pkg/actions/nodegroup/create_test.go +++ b/pkg/actions/nodegroup/create_test.go @@ -77,11 +77,11 @@ type stackManagerDelegate struct { ngTaskCreator nodeGroupTaskCreator } -func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree { +func (s *stackManagerDelegate) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree { return s.ngTaskCreator.NewUnmanagedNodeGroupTask(ctx, nodeGroups, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation, vpcImporter) } -func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer) *tasks.TaskTree { +func (s *stackManagerDelegate) NewManagedNodeGroupTask(context.Context, []*api.ManagedNodeGroup, bool, vpc.Importer, int) *tasks.TaskTree { return nil } diff --git a/pkg/cfn/manager/create_tasks.go b/pkg/cfn/manager/create_tasks.go index 4aa9f8f6e1..db2d6bd396 100644 --- a/pkg/cfn/manager/create_tasks.go +++ b/pkg/cfn/manager/create_tasks.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" - "github.com/kris-nova/logger" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,7 +23,7 @@ import ( // NewTasksToCreateCluster defines all tasks required to create a cluster along // with some nodegroups; see CreateAllNodeGroups for how onlyNodeGroupSubset works. func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroups []*api.NodeGroup, - managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree { + managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, nodeGroupParallelism int, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree { taskTree := tasks.TaskTree{Parallel: false} taskTree.Append(&createClusterTask{ @@ -46,11 +44,11 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup IsSubTask: true, } disableAccessEntryCreation := accessConfig.AuthenticationMode == ekstypes.AuthenticationModeConfigMap - if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter); unmanagedNodeGroupTasks.Len() > 0 { + if unmanagedNodeGroupTasks := c.NewUnmanagedNodeGroupTask(ctx, nodeGroups, false, false, disableAccessEntryCreation, vpcImporter, nodeGroupParallelism); unmanagedNodeGroupTasks.Len() > 0 { unmanagedNodeGroupTasks.IsSubTask = true nodeGroupTasks.Append(unmanagedNodeGroupTasks) } - if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter); managedNodeGroupTasks.Len() > 0 { + if managedNodeGroupTasks := c.NewManagedNodeGroupTask(ctx, managedNodeGroups, false, vpcImporter, nodeGroupParallelism); managedNodeGroupTasks.Len() > 0 { managedNodeGroupTasks.IsSubTask = true nodeGroupTasks.Append(managedNodeGroupTasks) } @@ -75,7 +73,7 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup } // NewUnmanagedNodeGroupTask returns tasks for creating self-managed nodegroups. -func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer) *tasks.TaskTree { +func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, vpcImporter vpc.Importer, parallelism int) *tasks.TaskTree { task := &UnmanagedNodeGroupTask{ ClusterConfig: c.spec, NodeGroups: nodeGroups, @@ -93,12 +91,13 @@ func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGro SkipEgressRules: skipEgressRules, DisableAccessEntryCreation: disableAccessEntryCreation, VPCImporter: vpcImporter, + Parallelism: parallelism, }) } // NewManagedNodeGroupTask defines tasks required to create managed nodegroups -func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer) *tasks.TaskTree { - taskTree := &tasks.TaskTree{Parallel: true} +func (c *StackCollection) NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, vpcImporter vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree { + taskTree := &tasks.TaskTree{Parallel: true, Limit: nodeGroupParallelism} for _, ng := range nodeGroups { // Disable parallelisation if any tags propagation is done // since nodegroup must be created to propagate tags to its ASGs. @@ -162,7 +161,7 @@ func (c *StackCollection) NewTasksToCreateIAMServiceAccounts(serviceAccounts []* objectMeta.SetAnnotations(sa.AsObjectMeta().Annotations) objectMeta.SetLabels(sa.AsObjectMeta().Labels) if err := kubernetes.MaybeCreateServiceAccountOrUpdateMetadata(clientSet, objectMeta); err != nil { - return errors.Wrapf(err, "failed to create service account %s/%s", objectMeta.GetNamespace(), objectMeta.GetName()) + return fmt.Errorf("failed to create service account %s/%s: %w", objectMeta.GetNamespace(), objectMeta.GetName(), err) } return nil }, diff --git a/pkg/cfn/manager/fakes/fake_stack_manager.go b/pkg/cfn/manager/fakes/fake_stack_manager.go index 7cff173d40..b150c8baee 100644 --- a/pkg/cfn/manager/fakes/fake_stack_manager.go +++ b/pkg/cfn/manager/fakes/fake_stack_manager.go @@ -634,13 +634,14 @@ type FakeStackManager struct { mustUpdateStackReturnsOnCall map[int]struct { result1 error } - NewManagedNodeGroupTaskStub func(context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer) *tasks.TaskTree + NewManagedNodeGroupTaskStub func(context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer, int) *tasks.TaskTree newManagedNodeGroupTaskMutex sync.RWMutex newManagedNodeGroupTaskArgsForCall []struct { arg1 context.Context arg2 []*v1alpha5.ManagedNodeGroup arg3 bool arg4 vpc.Importer + arg5 int } newManagedNodeGroupTaskReturns struct { result1 *tasks.TaskTree @@ -663,7 +664,7 @@ type FakeStackManager struct { newTaskToDeleteUnownedNodeGroupReturnsOnCall map[int]struct { result1 tasks.Task } - NewTasksToCreateClusterStub func(context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, ...tasks.Task) *tasks.TaskTree + NewTasksToCreateClusterStub func(context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, int, ...tasks.Task) *tasks.TaskTree newTasksToCreateClusterMutex sync.RWMutex newTasksToCreateClusterArgsForCall []struct { arg1 context.Context @@ -671,7 +672,8 @@ type FakeStackManager struct { arg3 []*v1alpha5.ManagedNodeGroup arg4 *v1alpha5.AccessConfig arg5 accessentry.CreatorInterface - arg6 []tasks.Task + arg6 int + arg7 []tasks.Task } newTasksToCreateClusterReturns struct { result1 *tasks.TaskTree @@ -765,7 +767,7 @@ type FakeStackManager struct { result1 *tasks.TaskTree result2 error } - NewUnmanagedNodeGroupTaskStub func(context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer) *tasks.TaskTree + NewUnmanagedNodeGroupTaskStub func(context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer, int) *tasks.TaskTree newUnmanagedNodeGroupTaskMutex sync.RWMutex newUnmanagedNodeGroupTaskArgsForCall []struct { arg1 context.Context @@ -774,6 +776,7 @@ type FakeStackManager struct { arg4 bool arg5 bool arg6 vpc.Importer + arg7 int } newUnmanagedNodeGroupTaskReturns struct { result1 *tasks.TaskTree @@ -3800,7 +3803,7 @@ func (fake *FakeStackManager) MustUpdateStackReturnsOnCall(i int, result1 error) }{result1} } -func (fake *FakeStackManager) NewManagedNodeGroupTask(arg1 context.Context, arg2 []*v1alpha5.ManagedNodeGroup, arg3 bool, arg4 vpc.Importer) *tasks.TaskTree { +func (fake *FakeStackManager) NewManagedNodeGroupTask(arg1 context.Context, arg2 []*v1alpha5.ManagedNodeGroup, arg3 bool, arg4 vpc.Importer, arg5 int) *tasks.TaskTree { var arg2Copy []*v1alpha5.ManagedNodeGroup if arg2 != nil { arg2Copy = make([]*v1alpha5.ManagedNodeGroup, len(arg2)) @@ -3813,13 +3816,14 @@ func (fake *FakeStackManager) NewManagedNodeGroupTask(arg1 context.Context, arg2 arg2 []*v1alpha5.ManagedNodeGroup arg3 bool arg4 vpc.Importer - }{arg1, arg2Copy, arg3, arg4}) + arg5 int + }{arg1, arg2Copy, arg3, arg4, arg5}) stub := fake.NewManagedNodeGroupTaskStub fakeReturns := fake.newManagedNodeGroupTaskReturns - fake.recordInvocation("NewManagedNodeGroupTask", []interface{}{arg1, arg2Copy, arg3, arg4}) + fake.recordInvocation("NewManagedNodeGroupTask", []interface{}{arg1, arg2Copy, arg3, arg4, arg5}) fake.newManagedNodeGroupTaskMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4) + return stub(arg1, arg2, arg3, arg4, arg5) } if specificReturn { return ret.result1 @@ -3833,17 +3837,17 @@ func (fake *FakeStackManager) NewManagedNodeGroupTaskCallCount() int { return len(fake.newManagedNodeGroupTaskArgsForCall) } -func (fake *FakeStackManager) NewManagedNodeGroupTaskCalls(stub func(context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer) *tasks.TaskTree) { +func (fake *FakeStackManager) NewManagedNodeGroupTaskCalls(stub func(context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer, int) *tasks.TaskTree) { fake.newManagedNodeGroupTaskMutex.Lock() defer fake.newManagedNodeGroupTaskMutex.Unlock() fake.NewManagedNodeGroupTaskStub = stub } -func (fake *FakeStackManager) NewManagedNodeGroupTaskArgsForCall(i int) (context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer) { +func (fake *FakeStackManager) NewManagedNodeGroupTaskArgsForCall(i int) (context.Context, []*v1alpha5.ManagedNodeGroup, bool, vpc.Importer, int) { fake.newManagedNodeGroupTaskMutex.RLock() defer fake.newManagedNodeGroupTaskMutex.RUnlock() argsForCall := fake.newManagedNodeGroupTaskArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 } func (fake *FakeStackManager) NewManagedNodeGroupTaskReturns(result1 *tasks.TaskTree) { @@ -3934,7 +3938,7 @@ func (fake *FakeStackManager) NewTaskToDeleteUnownedNodeGroupReturnsOnCall(i int }{result1} } -func (fake *FakeStackManager) NewTasksToCreateCluster(arg1 context.Context, arg2 []*v1alpha5.NodeGroup, arg3 []*v1alpha5.ManagedNodeGroup, arg4 *v1alpha5.AccessConfig, arg5 accessentry.CreatorInterface, arg6 ...tasks.Task) *tasks.TaskTree { +func (fake *FakeStackManager) NewTasksToCreateCluster(arg1 context.Context, arg2 []*v1alpha5.NodeGroup, arg3 []*v1alpha5.ManagedNodeGroup, arg4 *v1alpha5.AccessConfig, arg5 accessentry.CreatorInterface, arg6 int, arg7 ...tasks.Task) *tasks.TaskTree { var arg2Copy []*v1alpha5.NodeGroup if arg2 != nil { arg2Copy = make([]*v1alpha5.NodeGroup, len(arg2)) @@ -3953,14 +3957,15 @@ func (fake *FakeStackManager) NewTasksToCreateCluster(arg1 context.Context, arg2 arg3 []*v1alpha5.ManagedNodeGroup arg4 *v1alpha5.AccessConfig arg5 accessentry.CreatorInterface - arg6 []tasks.Task - }{arg1, arg2Copy, arg3Copy, arg4, arg5, arg6}) + arg6 int + arg7 []tasks.Task + }{arg1, arg2Copy, arg3Copy, arg4, arg5, arg6, arg7}) stub := fake.NewTasksToCreateClusterStub fakeReturns := fake.newTasksToCreateClusterReturns - fake.recordInvocation("NewTasksToCreateCluster", []interface{}{arg1, arg2Copy, arg3Copy, arg4, arg5, arg6}) + fake.recordInvocation("NewTasksToCreateCluster", []interface{}{arg1, arg2Copy, arg3Copy, arg4, arg5, arg6, arg7}) fake.newTasksToCreateClusterMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4, arg5, arg6...) + return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7...) } if specificReturn { return ret.result1 @@ -3974,17 +3979,17 @@ func (fake *FakeStackManager) NewTasksToCreateClusterCallCount() int { return len(fake.newTasksToCreateClusterArgsForCall) } -func (fake *FakeStackManager) NewTasksToCreateClusterCalls(stub func(context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, ...tasks.Task) *tasks.TaskTree) { +func (fake *FakeStackManager) NewTasksToCreateClusterCalls(stub func(context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, int, ...tasks.Task) *tasks.TaskTree) { fake.newTasksToCreateClusterMutex.Lock() defer fake.newTasksToCreateClusterMutex.Unlock() fake.NewTasksToCreateClusterStub = stub } -func (fake *FakeStackManager) NewTasksToCreateClusterArgsForCall(i int) (context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, []tasks.Task) { +func (fake *FakeStackManager) NewTasksToCreateClusterArgsForCall(i int) (context.Context, []*v1alpha5.NodeGroup, []*v1alpha5.ManagedNodeGroup, *v1alpha5.AccessConfig, accessentry.CreatorInterface, int, []tasks.Task) { fake.newTasksToCreateClusterMutex.RLock() defer fake.newTasksToCreateClusterMutex.RUnlock() argsForCall := fake.newTasksToCreateClusterArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7 } func (fake *FakeStackManager) NewTasksToCreateClusterReturns(result1 *tasks.TaskTree) { @@ -4370,7 +4375,7 @@ func (fake *FakeStackManager) NewTasksToDeleteOIDCProviderWithIAMServiceAccounts }{result1, result2} } -func (fake *FakeStackManager) NewUnmanagedNodeGroupTask(arg1 context.Context, arg2 []*v1alpha5.NodeGroup, arg3 bool, arg4 bool, arg5 bool, arg6 vpc.Importer) *tasks.TaskTree { +func (fake *FakeStackManager) NewUnmanagedNodeGroupTask(arg1 context.Context, arg2 []*v1alpha5.NodeGroup, arg3 bool, arg4 bool, arg5 bool, arg6 vpc.Importer, arg7 int) *tasks.TaskTree { var arg2Copy []*v1alpha5.NodeGroup if arg2 != nil { arg2Copy = make([]*v1alpha5.NodeGroup, len(arg2)) @@ -4385,13 +4390,14 @@ func (fake *FakeStackManager) NewUnmanagedNodeGroupTask(arg1 context.Context, ar arg4 bool arg5 bool arg6 vpc.Importer - }{arg1, arg2Copy, arg3, arg4, arg5, arg6}) + arg7 int + }{arg1, arg2Copy, arg3, arg4, arg5, arg6, arg7}) stub := fake.NewUnmanagedNodeGroupTaskStub fakeReturns := fake.newUnmanagedNodeGroupTaskReturns - fake.recordInvocation("NewUnmanagedNodeGroupTask", []interface{}{arg1, arg2Copy, arg3, arg4, arg5, arg6}) + fake.recordInvocation("NewUnmanagedNodeGroupTask", []interface{}{arg1, arg2Copy, arg3, arg4, arg5, arg6, arg7}) fake.newUnmanagedNodeGroupTaskMutex.Unlock() if stub != nil { - return stub(arg1, arg2, arg3, arg4, arg5, arg6) + return stub(arg1, arg2, arg3, arg4, arg5, arg6, arg7) } if specificReturn { return ret.result1 @@ -4405,17 +4411,17 @@ func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskCallCount() int { return len(fake.newUnmanagedNodeGroupTaskArgsForCall) } -func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskCalls(stub func(context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer) *tasks.TaskTree) { +func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskCalls(stub func(context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer, int) *tasks.TaskTree) { fake.newUnmanagedNodeGroupTaskMutex.Lock() defer fake.newUnmanagedNodeGroupTaskMutex.Unlock() fake.NewUnmanagedNodeGroupTaskStub = stub } -func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskArgsForCall(i int) (context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer) { +func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskArgsForCall(i int) (context.Context, []*v1alpha5.NodeGroup, bool, bool, bool, vpc.Importer, int) { fake.newUnmanagedNodeGroupTaskMutex.RLock() defer fake.newUnmanagedNodeGroupTaskMutex.RUnlock() argsForCall := fake.newUnmanagedNodeGroupTaskArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6 + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5, argsForCall.arg6, argsForCall.arg7 } func (fake *FakeStackManager) NewUnmanagedNodeGroupTaskReturns(result1 *tasks.TaskTree) { diff --git a/pkg/cfn/manager/interface.go b/pkg/cfn/manager/interface.go index 64346463d1..ef887db7bc 100644 --- a/pkg/cfn/manager/interface.go +++ b/pkg/cfn/manager/interface.go @@ -84,15 +84,15 @@ type StackManager interface { LookupCloudTrailEvents(ctx context.Context, i *Stack) ([]cttypes.Event, error) MakeChangeSetName(action string) string MakeClusterStackName() string - NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, importer vpc.Importer) *tasks.TaskTree + NewManagedNodeGroupTask(ctx context.Context, nodeGroups []*api.ManagedNodeGroup, forceAddCNIPolicy bool, importer vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree NewTasksToDeleteClusterWithNodeGroups(ctx context.Context, clusterStack *Stack, nodeGroupStacks []NodeGroupStack, clusterOperable bool, newOIDCManager NewOIDCManager, newTasksToDeleteAddonIAM NewTasksToDeleteAddonIAM, newTasksToDeletePodIdentityRole NewTasksToDeletePodIdentityRole, cluster *ekstypes.Cluster, clientSetGetter kubernetes.ClientSetGetter, wait, force bool, cleanup func(chan error, string) error) (*tasks.TaskTree, error) NewTasksToCreateIAMServiceAccounts(serviceAccounts []*api.ClusterIAMServiceAccount, oidc *iamoidc.OpenIDConnectManager, clientSetGetter kubernetes.ClientSetGetter) *tasks.TaskTree NewTaskToDeleteUnownedNodeGroup(ctx context.Context, clusterName, nodegroup string, nodeGroupDeleter NodeGroupDeleter, waitCondition *DeleteWaitCondition) tasks.Task - NewTasksToCreateCluster(ctx context.Context, nodeGroups []*api.NodeGroup, managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree + NewTasksToCreateCluster(ctx context.Context, nodeGroups []*api.NodeGroup, managedNodeGroups []*api.ManagedNodeGroup, accessConfig *api.AccessConfig, accessEntryCreator accessentry.CreatorInterface, nodeGroupParallelism int, postClusterCreationTasks ...tasks.Task) *tasks.TaskTree NewTasksToDeleteIAMServiceAccounts(ctx context.Context, serviceAccounts []string, clientSetGetter kubernetes.ClientSetGetter, wait bool) (*tasks.TaskTree, error) NewTasksToDeleteNodeGroups(stacks []NodeGroupStack, shouldDelete func(_ string) bool, wait bool, cleanup func(chan error, string) error) (*tasks.TaskTree, error) NewTasksToDeleteOIDCProviderWithIAMServiceAccounts(ctx context.Context, newOIDCManager NewOIDCManager, cluster *ekstypes.Cluster, clientSetGetter kubernetes.ClientSetGetter, force bool) (*tasks.TaskTree, error) - NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, importer vpc.Importer) *tasks.TaskTree + NewUnmanagedNodeGroupTask(ctx context.Context, nodeGroups []*api.NodeGroup, forceAddCNIPolicy, skipEgressRules, disableAccessEntryCreation bool, importer vpc.Importer, nodeGroupParallelism int) *tasks.TaskTree PropagateManagedNodeGroupTagsToASG(ngName string, ngTags map[string]string, asgNames []string, errCh chan error) error RefreshFargatePodExecutionRoleARN(ctx context.Context) error StackStatusIsNotTransitional(s *Stack) bool diff --git a/pkg/cfn/manager/nodegroup.go b/pkg/cfn/manager/nodegroup.go index adb752d1ec..8e52b4fe1e 100644 --- a/pkg/cfn/manager/nodegroup.go +++ b/pkg/cfn/manager/nodegroup.go @@ -46,6 +46,7 @@ type CreateNodeGroupOptions struct { SkipEgressRules bool DisableAccessEntryCreation bool VPCImporter vpc.Importer + Parallelism int } // A NodeGroupStackManager describes and creates nodegroup stacks. @@ -81,7 +82,7 @@ type UnmanagedNodeGroupTask struct { // Create creates a TaskTree for creating nodegroups. func (t *UnmanagedNodeGroupTask) Create(ctx context.Context, options CreateNodeGroupOptions) *tasks.TaskTree { - taskTree := &tasks.TaskTree{Parallel: true} + taskTree := &tasks.TaskTree{Parallel: true, Limit: options.Parallelism} for _, ng := range t.NodeGroups { ng := ng diff --git a/pkg/cfn/manager/tasks_test.go b/pkg/cfn/manager/tasks_test.go index 9744c3b1ee..e5a23c92a1 100644 --- a/pkg/cfn/manager/tasks_test.go +++ b/pkg/cfn/manager/tasks_test.go @@ -80,22 +80,22 @@ var _ = Describe("StackCollection Tasks", func() { // The supportsManagedNodes argument has no effect on the Describe call, so the values are alternated // in these tests { - tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("bar", "foo"), false, false, true, fakeVPCImporter) + tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("bar", "foo"), false, false, true, fakeVPCImporter, 0) Expect(tasks.Describe()).To(Equal(` 2 parallel tasks: { create nodegroup "bar", create nodegroup "foo" } `)) } { - tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("bar"), false, false, true, fakeVPCImporter) + tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("bar"), false, false, true, fakeVPCImporter, 0) Expect(tasks.Describe()).To(Equal(`1 task: { create nodegroup "bar" }`)) } { - tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("foo"), false, false, true, fakeVPCImporter) + tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), makeNodeGroups("foo"), false, false, true, fakeVPCImporter, 0) Expect(tasks.Describe()).To(Equal(`1 task: { create nodegroup "foo" }`)) } { - tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), nil, false, false, true, fakeVPCImporter) + tasks := stackManager.NewUnmanagedNodeGroupTask(context.Background(), nil, false, false, true, fakeVPCImporter, 0) Expect(tasks.Describe()).To(Equal(`no tasks`)) } @@ -103,7 +103,7 @@ var _ = Describe("StackCollection Tasks", func() { AuthenticationMode: ekstypes.AuthenticationModeConfigMap, } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), nil, accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), nil, accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", 2 parallel sub-tasks: { @@ -114,18 +114,18 @@ var _ = Describe("StackCollection Tasks", func() { `)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar"), nil, accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar"), nil, accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", create nodegroup "bar" } `)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), nil, nil, accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), nil, nil, accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(`1 task: { create cluster control plane "test-cluster" }`)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), makeManagedNodeGroups("m1", "m2"), accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), makeManagedNodeGroups("m1", "m2"), accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", 2 parallel sub-tasks: { @@ -142,7 +142,7 @@ var _ = Describe("StackCollection Tasks", func() { `)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), makeManagedNodeGroupsWithPropagatedTags("m1", "m2"), accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar", "foo"), makeManagedNodeGroupsWithPropagatedTags("m1", "m2"), accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", 2 parallel sub-tasks: { @@ -165,7 +165,7 @@ var _ = Describe("StackCollection Tasks", func() { `)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("foo"), makeManagedNodeGroups("m1"), accessConfig, nil) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("foo"), makeManagedNodeGroups("m1"), accessConfig, nil, 0) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", 2 parallel sub-tasks: { @@ -176,7 +176,7 @@ var _ = Describe("StackCollection Tasks", func() { `)) } { - tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar"), nil, accessConfig, nil, &task{id: 1}) + tasks := stackManager.NewTasksToCreateCluster(context.Background(), makeNodeGroups("bar"), nil, accessConfig, nil, 0, &task{id: 1}) Expect(tasks.Describe()).To(Equal(` 2 sequential tasks: { create cluster control plane "test-cluster", 2 sequential sub-tasks: { @@ -206,7 +206,7 @@ var _ = Describe("StackCollection Tasks", func() { p.MockCloudFormation().On("ListStacks", mock.Anything, mock.Anything, mock.Anything).Return(&cloudformation.ListStacksOutput{}, nil) ng := api.NewManagedNodeGroup() fakeVPCImporter := new(vpcfakes.FakeImporter) - tasks := stackManager.NewManagedNodeGroupTask(context.Background(), []*api.ManagedNodeGroup{ng}, false, fakeVPCImporter) + tasks := stackManager.NewManagedNodeGroupTask(context.Background(), []*api.ManagedNodeGroup{ng}, false, fakeVPCImporter, 0) errs := tasks.DoAllSync() Expect(errs).To(HaveLen(1)) Expect(errs[0]).To(MatchError(ContainSubstring("managed nodegroups cannot be created on IPv6 unowned clusters"))) @@ -216,7 +216,7 @@ var _ = Describe("StackCollection Tasks", func() { p.MockCloudFormation().On("ListStacks", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("not found")) ng := api.NewManagedNodeGroup() fakeVPCImporter := new(vpcfakes.FakeImporter) - tasks := stackManager.NewManagedNodeGroupTask(context.Background(), []*api.ManagedNodeGroup{ng}, false, fakeVPCImporter) + tasks := stackManager.NewManagedNodeGroupTask(context.Background(), []*api.ManagedNodeGroup{ng}, false, fakeVPCImporter, 0) errs := tasks.DoAllSync() Expect(errs).To(HaveLen(1)) Expect(errs[0]).To(MatchError(ContainSubstring("not found"))) diff --git a/pkg/ctl/cmdutils/create_cluster.go b/pkg/ctl/cmdutils/create_cluster.go index 92d99403d7..349d465489 100644 --- a/pkg/ctl/cmdutils/create_cluster.go +++ b/pkg/ctl/cmdutils/create_cluster.go @@ -48,4 +48,5 @@ type CreateNGOptions struct { InstallNeuronDevicePlugin bool InstallNvidiaDevicePlugin bool DryRun bool + NodeGroupParallelism int } diff --git a/pkg/ctl/cmdutils/nodegroup_flags.go b/pkg/ctl/cmdutils/nodegroup_flags.go index 211c6e3afc..015765ab86 100644 --- a/pkg/ctl/cmdutils/nodegroup_flags.go +++ b/pkg/ctl/cmdutils/nodegroup_flags.go @@ -64,6 +64,7 @@ func AddCommonCreateNodeGroupAddonsFlags(fs *pflag.FlagSet, ng *api.NodeGroup, o addCommonCreateNodeGroupIAMAddonsFlags(fs, ng) fs.BoolVarP(&options.InstallNeuronDevicePlugin, "install-neuron-plugin", "", true, "install Neuron plugin for Inferentia and Trainium nodes") fs.BoolVarP(&options.InstallNvidiaDevicePlugin, "install-nvidia-plugin", "", true, "install Nvidia plugin for GPU nodes") + fs.IntVarP(&options.NodeGroupParallelism, "nodegroup-parallelism", "", 8, "Number of self-managed or managed nodegroups to create in parallel") } // AddInstanceSelectorOptions adds flags for EC2 instance selector diff --git a/pkg/ctl/create/cluster.go b/pkg/ctl/create/cluster.go index bab7d52f36..8cd2975dda 100644 --- a/pkg/ctl/create/cluster.go +++ b/pkg/ctl/create/cluster.go @@ -8,10 +8,9 @@ import ( "strings" "sync" + "github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector" "github.com/aws/aws-sdk-go-v2/aws" ekstypes "github.com/aws/aws-sdk-go-v2/service/eks/types" - - "github.com/aws/amazon-ec2-instance-selector/v2/pkg/selector" "github.com/kris-nova/logger" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -363,7 +362,7 @@ func doCreateCluster(cmd *cmdutils.Cmd, ngFilter *filter.NodeGroupFilter, params } postClusterCreationTasks := ctl.CreateExtraClusterConfigTasks(ctx, cfg, preNodegroupAddons, updateVPCCNITask) - taskTree := stackManager.NewTasksToCreateCluster(ctx, cfg.NodeGroups, cfg.ManagedNodeGroups, cfg.AccessConfig, makeAccessEntryCreator(cfg.Metadata.Name, stackManager), postClusterCreationTasks) + taskTree := stackManager.NewTasksToCreateCluster(ctx, cfg.NodeGroups, cfg.ManagedNodeGroups, cfg.AccessConfig, makeAccessEntryCreator(cfg.Metadata.Name, stackManager), params.NodeGroupParallelism, postClusterCreationTasks) logger.Info(taskTree.Describe()) if errs := taskTree.DoAllSync(); len(errs) > 0 { diff --git a/pkg/ctl/create/nodegroup.go b/pkg/ctl/create/nodegroup.go index c042478385..cf1c86f0ce 100644 --- a/pkg/ctl/create/nodegroup.go +++ b/pkg/ctl/create/nodegroup.go @@ -82,6 +82,7 @@ func createNodeGroupCmd(cmd *cmdutils.Cmd) { }, SkipOutdatedAddonsCheck: options.SkipOutdatedAddonsCheck, ConfigFileProvided: cmd.ClusterConfigFile != "", + Parallelism: options.NodeGroupParallelism, }, ngFilter) }) } diff --git a/pkg/utils/tasks/tasks.go b/pkg/utils/tasks/tasks.go index 6ccb0befde..a7fc346eb1 100644 --- a/pkg/utils/tasks/tasks.go +++ b/pkg/utils/tasks/tasks.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/kris-nova/logger" + "golang.org/x/sync/errgroup" ) // Task is a common interface for the stack manager tasks. @@ -50,6 +51,7 @@ type TaskTree struct { Parallel bool PlanMode bool IsSubTask bool + Limit int } // Append new tasks to the set @@ -147,7 +149,11 @@ func (t *TaskTree) Do(allErrs chan error) error { errs := make(chan error) if t.Parallel { - go doParallelTasks(errs, t.Tasks) + if t.Limit > 0 { + go runInErrorGroup(t.Tasks, t.Limit, errs) + } else { + go doParallelTasks(errs, t.Tasks) + } } else { go doSequentialTasks(errs, t.Tasks) } @@ -173,7 +179,11 @@ func (t *TaskTree) DoAllSync() []error { errs := make(chan error) if t.Parallel { - go doParallelTasks(errs, t.Tasks) + if t.Limit > 0 { + go runInErrorGroup(t.Tasks, t.Limit, errs) + } else { + go doParallelTasks(errs, t.Tasks) + } } else { go doSequentialTasks(errs, t.Tasks) } @@ -217,6 +227,24 @@ func doParallelTasks(allErrs chan error, tasks []Task) { close(allErrs) } +func runInErrorGroup(tasks []Task, limit int, errs chan error) { + var eg errgroup.Group + eg.SetLimit(limit) + for _, t := range tasks { + t := t + eg.Go(func() error { + if ok := doSingleTask(errs, t); !ok { + logger.Debug("failed task: %s (will continue until other parallel tasks are completed)", t.Describe()) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + logger.Debug("error running tasks: %v", err) + } + close(errs) +} + func doSequentialTasks(allErrs chan error, tasks []Task) { for t := range tasks { if ok := doSingleTask(allErrs, tasks[t]); !ok {