diff --git a/docs/workloads/async/containers.md b/docs/workloads/async/containers.md index d61d1ced57..8f557ad11c 100644 --- a/docs/workloads/async/containers.md +++ b/docs/workloads/async/containers.md @@ -27,6 +27,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). diff --git a/docs/workloads/batch/containers.md b/docs/workloads/batch/containers.md index eb3a971e79..67263f4b33 100644 --- a/docs/workloads/batch/containers.md +++ b/docs/workloads/batch/containers.md @@ -33,6 +33,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). diff --git a/docs/workloads/realtime/containers.md b/docs/workloads/realtime/containers.md index b3b502a135..f700b54cee 100644 --- a/docs/workloads/realtime/containers.md +++ b/docs/workloads/realtime/containers.md @@ -25,6 +25,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's file system, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's proxy sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). diff --git a/pkg/config/config.go b/pkg/config/config.go index e347a8acac..c442ec6f19 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,8 +40,7 @@ import ( var ( OperatorMetadata *clusterconfig.OperatorMetadata - ClusterConfig *clusterconfig.Config - InstancesMetadata []aws.InstanceMetadata + ClusterConfig *clusterconfig.Config AWS *aws.Client K8s *k8s.Client @@ -92,10 +91,6 @@ func Init() error { ClusterConfig = clusterConfig - for _, instanceType := range clusterConfig.GetAllInstanceTypes() { - InstancesMetadata = append(InstancesMetadata, aws.InstanceMetadatas[clusterConfig.Region][instanceType]) - } - AWS, err = aws.NewForRegion(clusterConfig.Region) if err != nil { return err diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index b1989fd093..4c2af2d49d 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -18,6 +18,8 @@ package consts import ( "os" + + kresource "k8s.io/apimachinery/pkg/api/resource" ) var ( @@ -37,6 +39,11 @@ var ( AdminPortInt32 = int32(15000) AuthHeader = "X-Cortex-Authorization" + CortexProxyCPU = kresource.MustParse("100m") + CortexProxyMem = kresource.MustParse("100Mi") + CortexDequeuerCPU = kresource.MustParse("100m") + CortexDequeuerMem = kresource.MustParse("100Mi") + DefaultInClusterConfigPath = "/configs/cluster/cluster.yaml" MaxBucketLifecycleRules = 100 AsyncWorkloadsExpirationDays = int64(7) diff --git a/pkg/lib/k8s/quantity.go b/pkg/lib/k8s/quantity.go index 72285ff951..fc32cfe4fe 100644 --- a/pkg/lib/k8s/quantity.go +++ b/pkg/lib/k8s/quantity.go @@ -95,6 +95,22 @@ func NewMilliQuantity(milliValue int64) *Quantity { } } +// Returns nil if no quantities are passed in +func NewSummed(quantities ...kresource.Quantity) *Quantity { + if len(quantities) == 0 { + return nil + } + + k8sQuantity := kresource.Quantity{} + for _, q := range quantities { + k8sQuantity.Add(q) + } + + return &Quantity{ + Quantity: k8sQuantity, + } +} + func (quantity *Quantity) MilliString() string { return s.Int64(quantity.Quantity.MilliValue()) + "m" } @@ -103,32 +119,88 @@ func (quantity *Quantity) ToFloat32() float32 { return float32(quantity.Quantity.MilliValue()) / float32(1000) } -func (quantity *Quantity) ToKi() int64 { - kiFloat := float64(quantity.Quantity.Value()) / float64(1024) +func ToKiRounded(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) return int64(math.Round(kiFloat)) } +func ToKiRoundedStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiRounded(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiRounded() int64 { + return ToKiRounded(quantity.Quantity) +} +func (quantity *Quantity) ToKiRoundedStr() string { + return ToKiRoundedStr(quantity.Quantity) +} + +func ToKiCeil(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) + return int64(math.Ceil(kiFloat)) +} +func ToKiCeilStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiCeil(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiCeil() int64 { + return ToKiCeil(quantity.Quantity) +} +func (quantity *Quantity) ToKiCeilStr() string { + return ToKiCeilStr(quantity.Quantity) +} -// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) -func (quantity *Quantity) SplitInTwo() (*kresource.Quantity, *kresource.Quantity) { - return SplitInTwo(&quantity.Quantity) +func ToKiFloor(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) + return int64(math.Floor(kiFloat)) +} +func ToKiFloorStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiFloor(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiFloor() int64 { + return ToKiFloor(quantity.Quantity) +} +func (quantity *Quantity) ToKiFloorStr() string { + return ToKiFloorStr(quantity.Quantity) } -// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) -func SplitInTwo(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity) { - milliValue := quantity.MilliValue() - halfMilliValue := milliValue / 2 - q1 := kresource.NewMilliQuantity(milliValue-halfMilliValue, kresource.DecimalSI) - q2 := kresource.NewMilliQuantity(halfMilliValue, kresource.DecimalSI) - return q1, q2 +func ToMiRounded(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Round(miFloat)) +} +func ToMiRoundedStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiRounded(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiRounded() int64 { + return ToMiRounded(quantity.Quantity) +} +func (quantity *Quantity) ToMiRoundedStr() string { + return ToMiRoundedStr(quantity.Quantity) } -func SplitInThree(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity, *kresource.Quantity) { - milliValue := quantity.MilliValue() - thirdMilliValue := milliValue / 3 - q1 := kresource.NewMilliQuantity(milliValue-2*thirdMilliValue, kresource.DecimalSI) - q2 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI) - q3 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI) - return q1, q2, q3 +func ToMiCeil(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Ceil(miFloat)) +} +func ToMiCeilStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiCeil(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiCeil() int64 { + return ToMiCeil(quantity.Quantity) +} +func (quantity *Quantity) ToMiCeilStr() string { + return ToMiCeilStr(quantity.Quantity) +} + +func ToMiFloor(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Floor(miFloat)) +} +func ToMiFloorStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiFloor(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiFloor() int64 { + return ToMiFloor(quantity.Quantity) +} +func (quantity *Quantity) ToMiFloorStr() string { + return ToMiFloorStr(quantity.Quantity) } func (quantity *Quantity) Sub(q2 kresource.Quantity) { @@ -169,6 +241,13 @@ func (quantity *Quantity) ID() string { return s.Int64(quantity.MilliValue()) } +func (quantity *Quantity) DeepCopy() Quantity { + return Quantity{ + Quantity: quantity.Quantity.DeepCopy(), + UserString: quantity.UserString, + } +} + func QuantityPtr(k8sQuantity kresource.Quantity) *kresource.Quantity { return &k8sQuantity } diff --git a/pkg/operator/endpoints/info.go b/pkg/operator/endpoints/info.go index c0c04e0ce0..56e8ea1617 100644 --- a/pkg/operator/endpoints/info.go +++ b/pkg/operator/endpoints/info.go @@ -38,9 +38,8 @@ func Info(w http.ResponseWriter, r *http.Request) { } fullClusterConfig := clusterconfig.InternalConfig{ - Config: *config.ClusterConfig, - OperatorMetadata: *config.OperatorMetadata, - InstancesMetadata: config.InstancesMetadata, + Config: *config.ClusterConfig, + OperatorMetadata: *config.OperatorMetadata, } response := schema.InfoResponse{ diff --git a/pkg/operator/operator/memory_capacity.go b/pkg/operator/operator/memory_capacity.go index 18881f34f8..57f0bf64f0 100644 --- a/pkg/operator/operator/memory_capacity.go +++ b/pkg/operator/operator/memory_capacity.go @@ -18,6 +18,7 @@ package operator import ( "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/slices" kresource "k8s.io/apimachinery/pkg/api/resource" @@ -104,9 +105,10 @@ func UpdateMemoryCapacityConfigMap() (map[string]kresource.Quantity, error) { primaryInstances := []string{} minMemMap := map[string]kresource.Quantity{} - for _, instanceMetadata := range config.InstancesMetadata { - minMemMap[instanceMetadata.Type] = instanceMetadata.Memory - primaryInstances = append(primaryInstances, instanceMetadata.Type) + for _, ng := range config.ClusterConfig.NodeGroups { + instanceMetadata := aws.InstanceMetadatas[config.ClusterConfig.Region][ng.InstanceType] + minMemMap[ng.InstanceType] = instanceMetadata.Memory + primaryInstances = append(primaryInstances, ng.InstanceType) } nodeMemCapacityMap, err := getMemoryCapacityFromNodes(primaryInstances) diff --git a/pkg/operator/resources/errors.go b/pkg/operator/resources/errors.go index 6ad110ff44..a5e4748b5a 100644 --- a/pkg/operator/resources/errors.go +++ b/pkg/operator/resources/errors.go @@ -18,12 +18,20 @@ package resources import ( "fmt" + "strings" + "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" + "github.com/cortexlabs/cortex/pkg/lib/console" "github.com/cortexlabs/cortex/pkg/lib/errors" - "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/slices" s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/table" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/cortex/pkg/workloads" + kresource "k8s.io/apimachinery/pkg/api/resource" ) const ( @@ -73,26 +81,27 @@ func ErrorCannotChangeKindOfDeployedAPI(name string, newKind, prevKind userconfi }) } -func ErrorNoAvailableNodeComputeLimit(resource string, reqStr string, maxStr string) error { - message := fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances only have %s %s available", resource, reqStr, resource, maxStr, resource) - if maxStr == "0" { - message = fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances don't have any %s", resource, reqStr, resource, resource) - } +func ErrorNoAvailableNodeComputeLimit(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) error { + msg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n" + msg += console.Bold("requested pod resources\n") + msg += podResourceRequestsTable(api, compute) + msg += "\n" + s.TrimTrailingNewLines(nodeGroupResourcesTable(api, compute, maxMemMap)) + return errors.WithStack(&errors.Error{ Kind: ErrNoAvailableNodeComputeLimit, - Message: message, + Message: msg, }) } func ErrorAPIUsedByTrafficSplitter(trafficSplitters []string) error { return errors.WithStack(&errors.Error{ Kind: ErrRealtimeAPIUsedByTrafficSplitter, - Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", strings.PluralS("TrafficSplitter", len(trafficSplitters)), strings.StrsSentence(trafficSplitters, "")), + Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", s.PluralS("TrafficSplitter", len(trafficSplitters)), s.StrsSentence(trafficSplitters, "")), }) } func ErrorAPIsNotDeployed(notDeployedAPIs []string) error { - message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", strings.StrsAnd(notDeployedAPIs)) + message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", s.StrsAnd(notDeployedAPIs)) if len(notDeployedAPIs) == 1 { message = fmt.Sprintf("api %s was either not found or is not a RealtimeAPI", notDeployedAPIs[0]) } @@ -105,6 +114,83 @@ func ErrorAPIsNotDeployed(notDeployedAPIs []string) error { func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string) error { return errors.WithStack(&errors.Error{ Kind: ErrInvalidNodeGroupSelector, - Message: fmt.Sprintf("node group %s doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), + Message: fmt.Sprintf("node group \"%s\" doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API, or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), }) } + +func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) string { + sidecarCPUNote := "" + sidecarMemNote := "" + if api.Kind == userconfig.RealtimeAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexProxyMem), workloads.ProxyContainerName) + } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) + } + + var items table.KeyValuePairs + if compute.CPU != nil { + items.Add("CPU", compute.CPU.String()+sidecarCPUNote) + } + if compute.Mem != nil { + items.Add("memory", compute.Mem.ToMiCeilStr()+sidecarMemNote) + } + if compute.GPU > 0 { + items.Add("GPU", compute.GPU) + } + if compute.Inf > 0 { + items.Add("Inf", compute.Inf) + } + + return items.String() +} + +func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) string { + var skippedNodeGroups []string + var nodeGroupResourceRows [][]interface{} + + showGPU := false + showInf := false + if compute.GPU > 0 { + showGPU = true + } + if compute.Inf > 0 { + showInf = true + } + + for _, ng := range config.ClusterConfig.NodeGroups { + nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) + if nodeGPU > 0 { + showGPU = true + } + if nodeInf > 0 { + showInf = true + } + + if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { + skippedNodeGroups = append(skippedNodeGroups, ng.Name) + } else { + nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, k8s.ToMiFloorStr(nodeMem), nodeGPU, nodeInf}) + } + } + + nodeGroupResourceRowsTable := table.Table{ + Headers: []table.Header{ + {Title: "node group"}, + {Title: "instance type"}, + {Title: "CPU"}, + {Title: "memory"}, + {Title: "GPU", Hidden: !showGPU}, + {Title: "Inf", Hidden: !showInf}, + }, + Rows: nodeGroupResourceRows, + } + + out := nodeGroupResourceRowsTable.MustFormat() + if len(skippedNodeGroups) > 0 { + out += fmt.Sprintf("\nthe following %s skipped (based on the api configuration's %s field): %s", s.PluralCustom("node group was", "node groups were", len(skippedNodeGroups)), userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) + } + + return out +} diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index ebbc5514f6..3c240ad6d2 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -17,12 +17,12 @@ limitations under the License. package resources import ( - "fmt" - "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" + "github.com/cortexlabs/cortex/pkg/lib/slices" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -85,7 +85,7 @@ func ValidateClusterAPIs(apis []userconfig.API) error { api := &apis[i] if api.Kind != userconfig.TrafficSplitterKind { if err := validateK8sCompute(api, maxMemMap); err != nil { - return err + return errors.Wrap(err, api.Identify()) } } } @@ -132,83 +132,67 @@ var _inferentiaCPUReserve = kresource.MustParse("100m") var _inferentiaMemReserve = kresource.MustParse("100Mi") func validateK8sCompute(api *userconfig.API, maxMemMap map[string]kresource.Quantity) error { - allErrors := []error{} - successfulLoops := 0 - clusterNodeGroupNames := strset.New(config.ClusterConfig.GetNodeGroupNames()...) - apiNodeGroupNames := api.NodeGroups - - if apiNodeGroupNames != nil { - for _, ngName := range apiNodeGroupNames { - if !clusterNodeGroupNames.Has(ngName) { - return ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()) - } + for _, ngName := range api.NodeGroups { + if !clusterNodeGroupNames.Has(ngName) { + return errors.Wrap(ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()), userconfig.NodeGroupsKey) } } - compute := userconfig.GetTotalComputeFromContainers(api.Pod.Containers) + compute := userconfig.GetPodComputeRequest(api) - for _, instanceMetadata := range config.InstancesMetadata { - if apiNodeGroupNames != nil { - matchedNodeGroups := 0 - for _, ngName := range apiNodeGroupNames { - if config.ClusterConfig.GetNodeGroupByName(ngName).InstanceType == instanceMetadata.Type { - matchedNodeGroups++ - } - } - if matchedNodeGroups == 0 { - continue - } + for _, ng := range config.ClusterConfig.NodeGroups { + if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { + continue } - maxMemLoop := maxMemMap[instanceMetadata.Type] - maxMemLoop.Sub(_cortexMemReserve) + nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) - maxCPU := instanceMetadata.CPU - maxCPU.Sub(_cortexCPUReserve) - - maxGPU := instanceMetadata.GPU - if maxGPU > 0 { - // Reserve resources for nvidia device plugin daemonset - maxCPU.Sub(_nvidiaCPUReserve) - maxMemLoop.Sub(_nvidiaMemReserve) - // Reserve resources for nvidia dcgm prometheus exporter - maxCPU.Sub(_nvidiaDCGMExporterCPUReserve) - maxMemLoop.Sub(_nvidiaDCGMExporterMemReserve) + if compute.CPU != nil && nodeCPU.Cmp(compute.CPU.Quantity) < 0 { + continue + } else if compute.Mem != nil && nodeMem.Cmp(compute.Mem.Quantity) < 0 { + continue + } else if compute.GPU > nodeGPU { + continue + } else if compute.Inf > nodeInf { + continue } - maxInf := instanceMetadata.Inf - if maxInf > 0 { - // Reserve resources for inferentia device plugin daemonset - maxCPU.Sub(_inferentiaCPUReserve) - maxMemLoop.Sub(_inferentiaMemReserve) - } + // we found a node group that has capacity + return nil + } - loopErrors := []error{} - if compute.CPU != nil && maxCPU.Cmp(compute.CPU.Quantity) < 0 { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("CPU", compute.CPU.String(), maxCPU.String())) - } - if compute.Mem != nil && maxMemLoop.Cmp(compute.Mem.Quantity) < 0 { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("memory", compute.Mem.String(), maxMemLoop.String())) - } - if compute.GPU > maxGPU { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("GPU", fmt.Sprintf("%d", compute.GPU), fmt.Sprintf("%d", maxGPU))) - } - if compute.Inf > maxInf { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("Inf", fmt.Sprintf("%d", compute.Inf), fmt.Sprintf("%d", maxInf))) - } - if errors.HasError(loopErrors) { - allErrors = append(allErrors, errors.FirstError(loopErrors...)) - } else { - successfulLoops++ - } + // no nodegroups have capacity + return ErrorNoAvailableNodeComputeLimit(api, compute, maxMemMap) +} + +func getNodeCapacity(instanceType string, maxMemMap map[string]kresource.Quantity) (kresource.Quantity, kresource.Quantity, int64, int64) { + instanceMetadata := aws.InstanceMetadatas[config.ClusterConfig.Region][instanceType] + + cpu := instanceMetadata.CPU.DeepCopy() + cpu.Sub(_cortexCPUReserve) + + mem := maxMemMap[instanceType].DeepCopy() + mem.Sub(_cortexMemReserve) + + gpu := instanceMetadata.GPU + if gpu > 0 { + // Reserve resources for nvidia device plugin daemonset + cpu.Sub(_nvidiaCPUReserve) + mem.Sub(_nvidiaMemReserve) + // Reserve resources for nvidia dcgm prometheus exporter + cpu.Sub(_nvidiaDCGMExporterCPUReserve) + mem.Sub(_nvidiaDCGMExporterMemReserve) } - if successfulLoops == 0 { - return errors.FirstError(allErrors...) + inf := instanceMetadata.Inf + if inf > 0 { + // Reserve resources for inferentia device plugin daemonset + cpu.Sub(_inferentiaCPUReserve) + mem.Sub(_inferentiaMemReserve) } - return nil + return cpu, mem, gpu, inf } func validateEndpointCollisions(api *userconfig.API, virtualServices []istioclientnetworking.VirtualService) error { diff --git a/pkg/types/clusterconfig/cluster_config.go b/pkg/types/clusterconfig/cluster_config.go index ea9d59910a..1c0686f4bc 100644 --- a/pkg/types/clusterconfig/cluster_config.go +++ b/pkg/types/clusterconfig/cluster_config.go @@ -176,8 +176,6 @@ type InternalConfig struct { // Populated by operator OperatorMetadata - - InstancesMetadata []aws.InstanceMetadata `json:"instance_metadata"` } // The bare minimum to identify a cluster @@ -1798,15 +1796,6 @@ func (mc *ManagedConfig) TelemetryEvent() map[string]interface{} { return event } -func (mc *ManagedConfig) GetAllInstanceTypes() []string { - allInstanceTypes := strset.New() - for _, ng := range mc.NodeGroups { - allInstanceTypes.Add(ng.InstanceType) - } - - return allInstanceTypes.Slice() -} - func (mc *ManagedConfig) GetNodeGroupByName(name string) *NodeGroup { for _, ng := range mc.NodeGroups { if ng.Name == name { diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index fcb6643fa9..24d3a7af5e 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -55,7 +55,6 @@ const ( ErrCortexPrefixedEnvVarNotAllowed = "spec.cortex_prefixed_env_var_not_allowed" ErrDisallowedEnvVars = "spec.disallowed_env_vars" ErrComputeResourceConflict = "spec.compute_resource_conflict" - ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight" ErrTrafficSplitterAPIsNotUnique = "spec.traffic_splitter_apis_not_unique" ErrOneShadowPerTrafficSplitter = "spec.one_shadow_per_traffic_splitter" @@ -250,13 +249,6 @@ func ErrorComputeResourceConflict(resourceA, resourceB string) error { }) } -func ErrorInvalidNumberOfInfs(requestedInfs int64) error { - return errors.WithStack(&errors.Error{ - Kind: ErrInvalidNumberOfInfs, - Message: fmt.Sprintf("cannot request %d Infs (currently only 1 Inf can be used per API replica, due to AWS's bug: https://github.com/aws/aws-neuron-sdk/issues/110)", requestedInfs), - }) -} - func ErrorIncorrectTrafficSplitterWeightTotal(totalWeight int32) error { return errors.WithStack(&errors.Error{ Kind: ErrIncorrectTrafficSplitterWeight, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 3cc2385287..d203db77c0 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -711,8 +711,6 @@ func validatePod( awsClient *aws.Client, k8sClient *k8s.Client, ) error { - containers := api.Pod.Containers - totalCompute := userconfig.GetTotalComputeFromContainers(containers) if api.Pod.Port != nil && api.Kind == userconfig.TaskAPIKind { return ErrorFieldIsNotSupportedForKind(userconfig.PortKey, api.Kind) @@ -721,11 +719,11 @@ func validatePod( api.Pod.Port = pointer.Int32(consts.DefaultUserPodPortInt32) } - if err := validateCompute(totalCompute); err != nil { + if err := validateCompute(api); err != nil { return errors.Wrap(err, userconfig.ComputeKey) } - if err := validateContainers(containers, api.Kind, awsClient, k8sClient); err != nil { + if err := validateContainers(api.Pod.Containers, api.Kind, awsClient, k8sClient); err != nil { return errors.Wrap(err, userconfig.ContainersKey) } @@ -844,15 +842,13 @@ func validateAutoscaling(api *userconfig.API) error { return nil } -func validateCompute(compute userconfig.Compute) error { +func validateCompute(api *userconfig.API) error { + compute := userconfig.GetPodComputeRequest(api) + if compute.GPU > 0 && compute.Inf > 0 { return ErrorComputeResourceConflict(userconfig.GPUKey, userconfig.InfKey) } - if compute.Inf > 1 { - return ErrorInvalidNumberOfInfs(compute.Inf) - } - return nil } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index d601112e20..c2f8585941 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -21,12 +21,14 @@ import ( "strings" "time" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/urls" "github.com/cortexlabs/yaml" + kresource "k8s.io/apimachinery/pkg/api/resource" kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -463,46 +465,45 @@ func ZeroCompute() Compute { } } -func GetTotalComputeFromContainers(containers []*Container) Compute { - compute := Compute{} +func GetPodComputeRequest(api *API) Compute { + var cpuQtys []kresource.Quantity + var memQtys []kresource.Quantity + var shmQtys []kresource.Quantity + var totalGPU int64 + var totalInf int64 - for _, container := range containers { + for _, container := range api.Pod.Containers { if container == nil || container.Compute == nil { continue } - if container.Compute.CPU != nil { - newCPUQuantity := k8s.NewMilliQuantity(container.Compute.CPU.ToDec().MilliValue()) - if compute.CPU == nil { - compute.CPU = newCPUQuantity - } else if newCPUQuantity != nil { - compute.CPU.AddQty(*newCPUQuantity) - } + cpuQtys = append(cpuQtys, container.Compute.CPU.Quantity) } - if container.Compute.Mem != nil { - newMemQuantity := k8s.NewMilliQuantity(container.Compute.Mem.ToDec().MilliValue()) - if compute.Mem == nil { - compute.Mem = newMemQuantity - } else if newMemQuantity != nil { - compute.Mem.AddQty(*newMemQuantity) - } + memQtys = append(memQtys, container.Compute.Mem.Quantity) } - if container.Compute.Shm != nil { - newShmQuantity := k8s.NewMilliQuantity(container.Compute.Shm.ToDec().MilliValue()) - if compute.Shm == nil { - compute.Shm = newShmQuantity - } else if newShmQuantity != nil { - compute.Shm.AddQty(*newShmQuantity) - } + shmQtys = append(shmQtys, container.Compute.Shm.Quantity) } + totalGPU += container.Compute.GPU + totalInf += container.Compute.Inf + } - compute.GPU += container.Compute.GPU - compute.Inf += container.Compute.Inf + if api.Kind == RealtimeAPIKind { + cpuQtys = append(cpuQtys, consts.CortexProxyCPU) + memQtys = append(memQtys, consts.CortexProxyMem) + } else if api.Kind == AsyncAPIKind || api.Kind == BatchAPIKind { + cpuQtys = append(cpuQtys, consts.CortexDequeuerCPU) + memQtys = append(memQtys, consts.CortexDequeuerMem) } - return compute + return Compute{ + CPU: k8s.NewSummed(cpuQtys...), + Mem: k8s.NewSummed(memQtys...), + Shm: k8s.NewSummed(shmQtys...), + GPU: totalGPU, + Inf: totalInf, + } } func GetContainerNames(containers []*Container) strset.Set { @@ -558,7 +559,7 @@ func (api *API) TelemetryEvent() map[string]interface{} { event["pod.containers._num_readiness_probes"] = numReadinessProbes event["pod.containers._num_liveness_probes"] = numLivenessProbes - totalCompute := GetTotalComputeFromContainers(api.Pod.Containers) + totalCompute := GetPodComputeRequest(api) if totalCompute.CPU != nil { event["pod.containers.compute.cpu._is_defined"] = true event["pod.containers.compute.cpu"] = float64(totalCompute.CPU.MilliValue()) / 1000 diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 590eac0e03..eff75b624e 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -45,11 +45,9 @@ const ( _emptyDirVolumeName = "mnt" _emptyDirMountPath = "/mnt" - _gatewayContainerName = "gateway" - - _proxyContainerName = "proxy" - - _dequeuerContainerName = "dequeuer" + ProxyContainerName = "proxy" + DequeuerContainerName = "dequeuer" + GatewayContainerName = "gateway" _kubexitGraveyardName = "graveyard" _kubexitGraveyardMountPath = "/graveyard" @@ -76,7 +74,7 @@ var ( func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.VolumeMount) kcore.Container { return kcore.Container{ - Name: _gatewayContainerName, + Name: GatewayContainerName, Image: config.ClusterConfig.ImageAsyncGateway, ImagePullPolicy: kcore.PullAlways, Args: []string{ @@ -117,7 +115,7 @@ func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.V func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _dequeuerContainerName, + Name: DequeuerContainerName, Image: config.ClusterConfig.ImageDequeuer, ImagePullPolicy: kcore.PullAlways, Command: []string{ @@ -141,6 +139,12 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container ContainerPort: consts.AdminPortInt32, }, }, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexDequeuerCPU, + kcore.ResourceMemory: consts.CortexDequeuerMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ @@ -162,7 +166,7 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _dequeuerContainerName, + Name: DequeuerContainerName, Image: config.ClusterConfig.ImageDequeuer, ImagePullPolicy: kcore.PullAlways, Command: []string{ @@ -181,6 +185,12 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co "--admin-port", consts.AdminPortStr, }, Env: baseEnvVars, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexDequeuerCPU, + kcore.ResourceMemory: consts.CortexDequeuerMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ @@ -203,7 +213,7 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _proxyContainerName, + Name: ProxyContainerName, Image: config.ClusterConfig.ImageProxy, ImagePullPolicy: kcore.PullAlways, Args: []string{ @@ -229,6 +239,12 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { VolumeMounts: []kcore.VolumeMount{ ClusterConfigMount(), }, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexProxyCPU, + kcore.ResourceMemory: consts.CortexProxyMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ diff --git a/test/apis/async/hello-world/cortex_cpu.yaml b/test/apis/async/hello-world/cortex_cpu.yaml index 4a1143af7b..9348aeff99 100644 --- a/test/apis/async/hello-world/cortex_cpu.yaml +++ b/test/apis/async/hello-world/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/async/text-generator/cortex_cpu.yaml b/test/apis/async/text-generator/cortex_cpu.yaml index d78afacdcb..29f632f6ba 100644 --- a/test/apis/async/text-generator/cortex_cpu.yaml +++ b/test/apis/async/text-generator/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 1 - mem: 2.5G + mem: 2.5Gi diff --git a/test/apis/async/text-generator/cortex_gpu.yaml b/test/apis/async/text-generator/cortex_gpu.yaml index a18a8bc612..fa72ac6708 100644 --- a/test/apis/async/text-generator/cortex_gpu.yaml +++ b/test/apis/async/text-generator/cortex_gpu.yaml @@ -14,4 +14,4 @@ compute: cpu: 1 gpu: 1 - mem: 512M + mem: 512Mi diff --git a/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml b/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml index a96c29b861..0e23c3653c 100644 --- a/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml +++ b/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 1 - mem: 2G + mem: 2Gi diff --git a/test/apis/realtime/hello-world/cortex_cpu.yaml b/test/apis/realtime/hello-world/cortex_cpu.yaml index 3edc98541d..fd36e6e3d1 100644 --- a/test/apis/realtime/hello-world/cortex_cpu.yaml +++ b/test/apis/realtime/hello-world/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml b/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml index 8e74412630..c687163840 100644 --- a/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml +++ b/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml @@ -11,4 +11,4 @@ command: ["tfs_model_status_probe", "-addr", "localhost:8500", "-model-name", "resnet50"] compute: cpu: 1 - mem: 2G + mem: 2Gi diff --git a/test/apis/realtime/prime-generator/cortex_cpu.yaml b/test/apis/realtime/prime-generator/cortex_cpu.yaml index 146c78267a..70f3ae3ab2 100644 --- a/test/apis/realtime/prime-generator/cortex_cpu.yaml +++ b/test/apis/realtime/prime-generator/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/realtime/sleep/cortex_cpu.yaml b/test/apis/realtime/sleep/cortex_cpu.yaml index 8b27f7e92d..029c847e46 100644 --- a/test/apis/realtime/sleep/cortex_cpu.yaml +++ b/test/apis/realtime/sleep/cortex_cpu.yaml @@ -13,6 +13,6 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi autoscaling: target_in_flight: 1 diff --git a/test/apis/realtime/text-generator/cortex_cpu.yaml b/test/apis/realtime/text-generator/cortex_cpu.yaml index c31bb959b0..3b1fba338b 100644 --- a/test/apis/realtime/text-generator/cortex_cpu.yaml +++ b/test/apis/realtime/text-generator/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 1 - mem: 2.5G + mem: 2.5Gi diff --git a/test/apis/realtime/text-generator/cortex_gpu.yaml b/test/apis/realtime/text-generator/cortex_gpu.yaml index a0659ff234..cc694a372f 100644 --- a/test/apis/realtime/text-generator/cortex_gpu.yaml +++ b/test/apis/realtime/text-generator/cortex_gpu.yaml @@ -15,4 +15,4 @@ compute: cpu: 1 gpu: 1 - mem: 512M + mem: 512Mi diff --git a/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml b/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml index 20292f1fc6..f445b6bda2 100644 --- a/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml +++ b/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml @@ -14,7 +14,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world-b kind: RealtimeAPI @@ -32,7 +32,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world-shadow kind: RealtimeAPI @@ -50,7 +50,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world kind: TrafficSplitter