Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: calculate the overhead with the maximum value #155

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ controller:
# -- The external kubernetes cluster id for new nodes to connect with.
clusterID: ""
# -- The VM memory overhead as a percent that will be subtracted from the total memory for all instance types. The value of `0.075` equals to 7.5%.
vmMemoryOverheadPercent: 0.075
vmMemoryOverheadPercent: 0.065
# -- The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one
# time which usually results in fewer but larger nodes.
batchMaxDuration: 10s
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont

unavailableOfferingsCache := alicache.NewUnavailableOfferings()
instanceTypeProvider := instancetype.NewDefaultProvider(
*ecsClient.RegionId, ecsClient,
*ecsClient.RegionId, operator.GetClient(), ecsClient,
cache.New(alicache.InstanceTypesAndZonesTTL, alicache.DefaultCleanupInterval),
unavailableOfferingsCache,
pricingProvider, ackProvider)
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Options struct {
func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.ClusterID, "cluster-id", env.WithDefaultString("CLUSTER_ID", ""), "The external kubernetes cluster id for new nodes to connect with.")
// TODO: for different OS, the overhead is different, find a way to fix this.
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.Float64Var(&o.VMMemoryOverheadPercent, "vm-memory-overhead-percent", utils.WithDefaultFloat64("VM_MEMORY_OVERHEAD_PERCENT", 0.065), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types.")
fs.BoolVar(&o.Interruption, "interruption", env.WithDefaultBool("INTERRUPTION", true), "Enable interruption handling.")
fs.BoolVar(&o.TelemetryShare, "telemetry-share", env.WithDefaultBool("TELEMETRY_SHARE", true), "Enable telemetry sharing.")
fs.IntVar(&o.APGCreationQPS, "apg-qps", int(env.WithDefaultInt64("APG_CREATION_QPS", 100)), "The QPS limit for creating AutoProvisionGroup.")
Expand Down
44 changes: 42 additions & 2 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand All @@ -52,6 +54,7 @@ type Provider interface {

type DefaultProvider struct {
region string
kubeClient client.Client
ecsClient *ecsclient.Client
pricingProvider pricing.Provider
ackProvider ack.Provider
Expand All @@ -77,10 +80,11 @@ type DefaultProvider struct {
instanceTypesOfferingsSeqNum uint64
}

func NewDefaultProvider(region string, ecsClient *ecsclient.Client,
func NewDefaultProvider(region string, kubeClient client.Client, ecsClient *ecsclient.Client,
instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings,
pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
Expand Down Expand Up @@ -175,6 +179,11 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
return nil, fmt.Errorf("failed to get cluster CNI: %w", err)
}

nodeResourceOverhead, err := p.nodeOverhead(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get node resource overhead: %w", err)
}

result := lo.Map(p.instanceTypesInfo, func(i *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType, _ int) *cloudprovider.InstanceType {
zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData {
if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) {
Expand All @@ -194,7 +203,7 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
// so that Karpenter is able to cache the set of InstanceTypes based on values that alter the set of instance types
// !!! Important !!!
offers := p.createOfferings(ctx, *i.InstanceTypeId, zoneData)
return NewInstanceType(ctx, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, clusterCNI)
return NewInstanceType(ctx, nodeResourceOverhead, i, kc, p.region, nodeClass.Spec.SystemDisk, offers, clusterCNI)
})

// Filter out nil values
Expand All @@ -204,6 +213,37 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
return result, nil
}

func (p *DefaultProvider) nodeOverhead(ctx context.Context) (corev1.ResourceList, error) {
var nodes corev1.NodeList
if err := p.kubeClient.List(ctx, &nodes); err != nil {
return corev1.ResourceList{}, err
}

// We do not sure how to calculate the overhead of the node, let's just use the maximum possible
// To avoid some loop node creation
maxCPUOverHead := int64(0)
maxMemoryOverHead := int64(0)
for _, node := range nodes.Items {
capacity := node.Status.Capacity
allocatable := node.Status.Allocatable

cpuOverHead := capacity.Cpu().MilliValue() - allocatable.Cpu().MilliValue()
memoryOverHead := capacity.Memory().Value() - allocatable.Memory().Value()

if cpuOverHead > maxCPUOverHead {
maxCPUOverHead = cpuOverHead
}
if memoryOverHead > maxMemoryOverHead {
maxMemoryOverHead = memoryOverHead
}
}

return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(maxCPUOverHead, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(maxMemoryOverHead, resource.DecimalSI),
}, nil
}

func (p *DefaultProvider) UpdateInstanceTypes(ctx context.Context) error {
// DO NOT REMOVE THIS LOCK ----------------------------------------------------------------------------
// We lock here so that multiple callers to getInstanceTypesOfferings do not result in cache misses and multiple
Expand Down
68 changes: 5 additions & 63 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ var (
)

const (
MemoryAvailable = "memory.available"
NodeFSAvailable = "nodefs.available"

GiBMiBRatio = 1024
MiBByteRatio = 1024 * 1024
TerwayMinENIRequirements = 11
Expand All @@ -60,7 +57,7 @@ type ZoneData struct {
Available bool
}

func NewInstanceType(ctx context.Context,
func NewInstanceType(ctx context.Context, overhead corev1.ResourceList,
info *ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType,
kc *v1alpha1.KubeletConfiguration, region string, systemDisk *v1alpha1.SystemDisk,
offerings cloudprovider.Offerings, clusterCNI string) *cloudprovider.InstanceType {
Expand All @@ -74,9 +71,10 @@ func NewInstanceType(ctx context.Context,
Offerings: offerings,
Capacity: computeCapacity(ctx, info, kc.MaxPods, kc.PodsPerCore, systemDisk, clusterCNI),
Overhead: &cloudprovider.InstanceTypeOverhead{
KubeReserved: kubeReservedResources(kc.KubeReserved),
SystemReserved: systemReservedResources(kc.SystemReserved),
EvictionThreshold: evictionThreshold(memory(ctx, info), ephemeralStorage(systemDisk), kc.EvictionHard, kc.EvictionSoft),
// Follow overhead will be merged, so we can set only one overhead totally
KubeReserved: overhead,
SystemReserved: corev1.ResourceList{},
EvictionThreshold: corev1.ResourceList{},
},
}
if it.Requirements.Compatible(scheduling.NewRequirements(scheduling.NewRequirement(corev1.LabelOSStable, corev1.NodeSelectorOpIn, string(corev1.Windows)))) == nil {
Expand Down Expand Up @@ -178,62 +176,6 @@ func computeCapacity(ctx context.Context,
return resourceList
}

func kubeReservedResources(kubeReserved map[string]string) corev1.ResourceList {
resources := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("447Mi"),
corev1.ResourceCPU: resource.MustParse("35m"),
}

return lo.Assign(resources, lo.MapEntries(kubeReserved, func(k string, v string) (corev1.ResourceName, resource.Quantity) {
return corev1.ResourceName(k), resource.MustParse(v)
}))
}

func systemReservedResources(systemReserved map[string]string) corev1.ResourceList {
resources := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("447Mi"),
corev1.ResourceCPU: resource.MustParse("35m"),
}

return lo.Assign(resources, lo.MapEntries(systemReserved, func(k string, v string) (corev1.ResourceName, resource.Quantity) {
return corev1.ResourceName(k), resource.MustParse(v)
}))
}

func evictionThreshold(memory *resource.Quantity, storage *resource.Quantity, evictionHard map[string]string, evictionSoft map[string]string) corev1.ResourceList {
overhead := corev1.ResourceList{
// TODO: Following data is extract from real env
// Please check it more
corev1.ResourceMemory: resource.MustParse("300Mi"),
}

override := corev1.ResourceList{}
var evictionSignals []map[string]string
if evictionHard != nil {
evictionSignals = append(evictionSignals, evictionHard)
}
if evictionSoft != nil {
evictionSignals = append(evictionSignals, evictionSoft)
}

for _, m := range evictionSignals {
temp := corev1.ResourceList{}
if v, ok := m[MemoryAvailable]; ok {
temp[corev1.ResourceMemory] = computeEvictionSignal(*memory, v)
}
if v, ok := m[NodeFSAvailable]; ok {
temp[corev1.ResourceEphemeralStorage] = computeEvictionSignal(*storage, v)
}
override = resources.MaxResources(override, temp)
}
// Assign merges maps from left to right so overrides will always be taken last
return lo.Assign(overhead, override)
}

// computeEvictionSignal computes the resource quantity value for an eviction signal value, computed off the
// base capacity value if the signal value is a percentage or as a resource quantity if the signal value isn't a percentage
func computeEvictionSignal(capacity resource.Quantity, signalValue string) resource.Quantity {
Expand Down