From 61f438e09cbbff2c4ce410e218769947e211f23d Mon Sep 17 00:00:00 2001 From: jwcesign Date: Wed, 6 Nov 2024 22:32:53 +0800 Subject: [PATCH] chore: use ack provider to resolve the cluster cni Signed-off-by: jwcesign --- pkg/operator/operator.go | 3 +- pkg/operator/options/options.go | 2 - pkg/operator/options/options_validation.go | 3 -- pkg/providers/ack/ack.go | 36 ++++++++++++--- pkg/providers/instancetype/instancetype.go | 53 ++++++++++++++-------- 5 files changed, 66 insertions(+), 31 deletions(-) diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 0aaf0fef..a7c23caa 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -77,7 +77,6 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont os.Exit(1) } clusterID := options.FromContext(ctx).ClusterID - clusterCNI := options.FromContext(ctx).ClusterCNI region := *ecsClient.RegionId pricingProvider, err := pricing.NewDefaultProvider(ctx, region) @@ -103,7 +102,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont unavailableOfferingsCache := alicache.NewUnavailableOfferings() instanceTypeProvider := instancetype.NewDefaultProvider( - *ecsClient.RegionId, clusterCNI, ecsClient, + *ecsClient.RegionId, ecsClient, cache.New(alicache.InstanceTypesAndZonesTTL, alicache.DefaultCleanupInterval), unavailableOfferingsCache, pricingProvider, nil) diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 53e84c15..0c7bc42b 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -37,14 +37,12 @@ type optionsKey struct{} type Options struct { ClusterID string - ClusterCNI string VMMemoryOverheadPercent float64 Interruption bool } func (o *Options) AddFlags(fs *coreoptions.FlagSet) { fs.StringVar(&o.ClusterID, "cluster-id", env.WithDefaultString("CLUSTER_ID", ""), "The external kubernetes cluster id for new nodes to connect with.") - fs.StringVar(&o.ClusterCNI, "cluster-cni", env.WithDefaultString("CLUSTER_CNI", "terway-eniip"), "The network cni used by the cluster.") // TODO: for different OS, the overhead is different, find a way to fix this. fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.065), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.") fs.BoolVar(&o.Interruption, "interruption", env.WithDefaultBool("INTERRUPTION", true), "Enable interruption handling.") diff --git a/pkg/operator/options/options_validation.go b/pkg/operator/options/options_validation.go index ae453786..c4339370 100644 --- a/pkg/operator/options/options_validation.go +++ b/pkg/operator/options/options_validation.go @@ -32,8 +32,5 @@ func (o Options) validateRequiredFields() error { if o.ClusterID == "" { return fmt.Errorf("missing field, cluster-id") } - if o.ClusterCNI == "" { - return fmt.Errorf("missing field, cluster-network") - } return nil } diff --git a/pkg/providers/ack/ack.go b/pkg/providers/ack/ack.go index 4cba904c..d1ac3958 100644 --- a/pkg/providers/ack/ack.go +++ b/pkg/providers/ack/ack.go @@ -22,8 +22,10 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "regexp" "strings" + "sync" ackclient "github.com/alibabacloud-go/cs-20151215/v5/client" "github.com/alibabacloud-go/tea/tea" @@ -35,11 +37,16 @@ import ( type Provider interface { GetNodeRegisterScript(context.Context, map[string]string, *v1alpha1.KubeletConfiguration) (string, error) + GetClusterCNI(context.Context) (string, error) + LivenessProbe(*http.Request) error } type DefaultProvider struct { clusterID string ackClient *ackclient.Client + + muClusterCNI sync.RWMutex + clusterCNI string } func NewDefaultProvider(clusterID string, ackClient *ackclient.Client) *DefaultProvider { @@ -49,17 +56,29 @@ func NewDefaultProvider(clusterID string, ackClient *ackclient.Client) *DefaultP } } -func (p *DefaultProvider) GetClusterCNI(ctx context.Context) (string, error) { +func (p *DefaultProvider) LivenessProbe(_ *http.Request) error { + p.muClusterCNI.Lock() + //nolint: staticcheck + p.muClusterCNI.Unlock() + return nil +} + +func (p *DefaultProvider) GetClusterCNI(_ context.Context) (string, error) { + p.muClusterCNI.RLock() + clusterCNI := p.clusterCNI + p.muClusterCNI.RUnlock() + + if clusterCNI != "" { + return clusterCNI, nil + } + response, err := p.ackClient.DescribeClusterDetail(tea.String(p.clusterID)) if err != nil { return "", fmt.Errorf("failed to describe cluster: %w", err) } - if response.Body == nil { + if response.Body == nil || response.Body.MetaData == nil { return "", fmt.Errorf("empty cluster response") } - if response.Body.MetaData == nil { - return "", fmt.Errorf("empty cluster metadata") - } // Parse metadata JSON string // clusterMetaData represents the metadata structure in cluster response type clusterMetaData struct { @@ -71,7 +90,12 @@ func (p *DefaultProvider) GetClusterCNI(ctx context.Context) (string, error) { if err := json.Unmarshal([]byte(*response.Body.MetaData), &metadata); err != nil { return "", fmt.Errorf("failed to unmarshal cluster metadata: %w", err) } - return metadata.Capabilities.Network, nil + + p.muClusterCNI.Lock() + p.clusterCNI = metadata.Capabilities.Network + p.muClusterCNI.Unlock() + + return p.clusterCNI, nil } func (p *DefaultProvider) GetNodeRegisterScript(ctx context.Context, diff --git a/pkg/providers/instancetype/instancetype.go b/pkg/providers/instancetype/instancetype.go index 2caf0778..b14f2675 100644 --- a/pkg/providers/instancetype/instancetype.go +++ b/pkg/providers/instancetype/instancetype.go @@ -40,8 +40,8 @@ import ( "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/apis/v1alpha1" kcache "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/cache" + "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/providers/ack" "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/providers/pricing" - "github.com/cloudpilot-ai/karpenter-provider-alibabacloud/pkg/providers/vswitch" ) type Provider interface { @@ -53,10 +53,9 @@ type Provider interface { type DefaultProvider struct { region string - clusterCNI string ecsClient *ecsclient.Client - vSwitchProvider vswitch.Provider pricingProvider pricing.Provider + ackProvider ack.Provider // Values stored *before* considering insufficient capacity errors from the unavailableOfferings cache. // Fully initialized Instance Types are also cached based on the set of all instance types, zones, unavailableOfferings cache, @@ -79,15 +78,14 @@ type DefaultProvider struct { instanceTypesOfferingsSeqNum uint64 } -func NewDefaultProvider(region, clusterCNI string, ecsClient *ecsclient.Client, +func NewDefaultProvider(region string, ecsClient *ecsclient.Client, instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings, - pricingProvider pricing.Provider, vSwitchProvider vswitch.Provider) *DefaultProvider { + pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider { return &DefaultProvider{ ecsClient: ecsClient, region: region, - clusterCNI: clusterCNI, - vSwitchProvider: vSwitchProvider, pricingProvider: pricingProvider, + ackProvider: ackProvider, instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{}, instanceTypesOfferings: map[string]sets.Set[string]{}, instanceTypesCache: instanceTypesCache, @@ -98,12 +96,26 @@ func NewDefaultProvider(region, clusterCNI string, ecsClient *ecsclient.Client, } func (p *DefaultProvider) LivenessProbe(req *http.Request) error { - if err := p.vSwitchProvider.LivenessProbe(req); err != nil { + if err := p.ackProvider.LivenessProbe(req); err != nil { return err } return p.pricingProvider.LivenessProbe(req) } +func (p *DefaultProvider) validateState(nodeClass *v1alpha1.ECSNodeClass) error { + if len(p.instanceTypesInfo) == 0 { + return errors.New("no instance types found") + } + if len(p.instanceTypesOfferings) == 0 { + return errors.New("no instance types offerings found") + } + if len(nodeClass.Status.VSwitches) == 0 { + return errors.New("no vswitches found") + } + + return nil +} + func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfiguration, nodeClass *v1alpha1.ECSNodeClass) ([]*cloudprovider.InstanceType, error) { p.muInstanceTypeInfo.RLock() p.muInstanceTypesOfferings.RLock() @@ -113,14 +125,8 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur if kc == nil { kc = &v1alpha1.KubeletConfiguration{} } - if len(p.instanceTypesInfo) == 0 { - return nil, errors.New("no instance types found") - } - if len(p.instanceTypesOfferings) == 0 { - return nil, errors.New("no instance types offerings found") - } - if len(nodeClass.Status.VSwitches) == 0 { - return nil, errors.New("no vswitches found") + if err := p.validateState(nodeClass); err != nil { + return nil, err } vSwitchsZones := sets.New(lo.Map(nodeClass.Status.VSwitches, func(s v1alpha1.VSwitch, _ int) string { @@ -157,6 +163,11 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur log.FromContext(ctx).WithValues("zones", allZones.UnsortedList()).V(1).Info("discovered zones") } + clusterCNI, err := p.ackProvider.GetClusterCNI(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get cluster CNI: %w", err) + } + result := lo.Map(p.instanceTypesInfo, func(i *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType, _ int) *cloudprovider.InstanceType { zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData { if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) { @@ -176,7 +187,7 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur // so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types // !!! Important !!! offers := p.createOfferings(ctx, *i.InstanceTypeId, zoneData) - return NewInstanceType(ctx, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, p.clusterCNI) + return NewInstanceType(ctx, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, clusterCNI) }) // Filter out nil values @@ -199,9 +210,15 @@ func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error { log.FromContext(ctx).Error(err, "failed to get instance types") return err } + + clusterCNI, err := p.ackProvider.GetClusterCNI(ctx) + if err != nil { + return fmt.Errorf("failed to get cluster CNI: %w", err) + } + instanceTypes = lo.Filter(instanceTypes, func(item *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType, index int) bool { - switch p.clusterCNI { + switch clusterCNI { // TODO: support other network type, please check https://help.aliyun.com/zh/ack/ack-managed-and-ack-dedicated/user-guide/container-network/?spm=a2c4g.11186623.help-menu-85222.d_2_4_3.6d501109uQI315&scm=20140722.H_195424._.OR_help-V_1 case ClusterCNITypeTerway: maxENIPods := (tea.Int32Value(item.EniQuantity) - 1) * tea.Int32Value(item.EniPrivateIpAddressQuantity)