From 59138b725af294e1850a5f5316512cdd74cd4dc3 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 16:18:45 +0300 Subject: [PATCH 01/15] Install proxy for realtime + add port field --- cmd/proxy/main.go | 10 +- pkg/consts/consts.go | 12 ++- pkg/lib/strings/operations.go | 4 + pkg/operator/resources/asyncapi/k8s_specs.go | 7 +- .../resources/job/batchapi/k8s_specs.go | 3 +- .../resources/job/taskapi/k8s_specs.go | 3 +- .../resources/realtimeapi/k8s_specs.go | 9 +- pkg/operator/resources/trafficsplitter/api.go | 2 +- pkg/types/spec/errors.go | 32 ++++--- pkg/types/spec/validations.go | 25 ++++- pkg/types/userconfig/api.go | 9 +- pkg/types/userconfig/config_key.go | 1 + pkg/workloads/k8s.go | 95 ++++++++++--------- test/apis/realtime/Dockerfile | 3 +- test/apis/realtime/cortex.yaml | 1 + test/apis/realtime/main.py | 2 +- 16 files changed, 136 insertions(+), 82 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 0b0beea153..5ebb766f91 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -44,9 +44,9 @@ func main() { maxQueueLength int ) - flag.IntVar(&port, "port", 8000, "port where the proxy will be served") - flag.IntVar(&metricsPort, "metrics-port", 8001, "port where the proxy will be served") - flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy will redirect to the traffic to") + flag.IntVar(&port, "port", 8888, "port where the proxy is served") + flag.IntVar(&metricsPort, "metrics-port", 15000, "metrics port for prometheus") + flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to") flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container") flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container") flag.Parse() @@ -63,7 +63,7 @@ func main() { maxQueueLength = maxConcurrency * 10 } - target := "http://127.0.0.1:" + strconv.Itoa(port) + target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort) httpProxy := proxy.NewReverseProxy(target, maxQueueLength, maxQueueLength) requestCounterStats := &proxy.RequestStats{} @@ -101,7 +101,7 @@ func main() { servers := map[string]*http.Server{ "proxy": { - Addr: ":" + strconv.Itoa(userContainerPort), + Addr: ":" + strconv.Itoa(port), Handler: proxy.Handler(breaker, httpProxy), }, "metrics": { diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 0c6ee0d22a..6286c7a827 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -24,12 +24,18 @@ var ( CortexVersion = "master" // CORTEX_VERSION CortexVersionMinor = "master" // CORTEX_VERSION_MINOR - ProxyListeningPort = int64(8888) - ProxyListeningPortStr = "8888" DefaultMaxReplicaQueueLength = int64(1024) DefaultMaxReplicaConcurrency = int64(1024) DefaultTargetReplicaConcurrency = float64(8) - NeuronCoresPerInf = int64(4) + + DefaultUserPodPortStr = "8080" + DefaultUserPodPortInt32 = int32(8080) + + ProxyListeningPortStr = "8888" + ProxyListeningPortInt32 = int32(8888) + + MetricsPortStr = "15000" + MetricsPortInt32 = int32(15000) AuthHeader = "X-Cortex-Authorization" diff --git a/pkg/lib/strings/operations.go b/pkg/lib/strings/operations.go index aedb528c99..09700173e1 100644 --- a/pkg/lib/strings/operations.go +++ b/pkg/lib/strings/operations.go @@ -228,6 +228,10 @@ func PluralCustom(singular string, plural string, count interface{}) string { return plural } +func PluralToBe(count interface{}) string { + return PluralCustom("is", "are", count) +} + // RemoveDuplicates returns a filtered string slice without repeated entries. // The ignoreRegex parameter can optionally be used to ignore repeated patterns in each slice entry. func RemoveDuplicates(strs []string, ignoreRegex *regexp.Regexp) []string { diff --git a/pkg/operator/resources/asyncapi/k8s_specs.go b/pkg/operator/resources/asyncapi/k8s_specs.go index 50d62c61ed..fa27daf885 100644 --- a/pkg/operator/resources/asyncapi/k8s_specs.go +++ b/pkg/operator/resources/asyncapi/k8s_specs.go @@ -17,6 +17,7 @@ limitations under the License. package asyncapi import ( + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -128,8 +129,8 @@ func gatewayServiceSpec(api spec.API) kcore.Service { return *k8s.Service(&k8s.ServiceSpec{ Name: workloads.K8sName(api.Name), PortName: "http", - Port: workloads.DefaultPortInt32, - TargetPort: workloads.DefaultPortInt32, + Port: consts.ProxyListeningPortInt32, + TargetPort: consts.ProxyListeningPortInt32, Annotations: api.ToK8sAnnotations(), Labels: map[string]string{ "apiName": api.Name, @@ -152,7 +153,7 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService { Destinations: []k8s.Destination{{ ServiceName: workloads.K8sName(api.Name), Weight: 100, - Port: uint32(workloads.DefaultPortInt32), + Port: uint32(consts.ProxyListeningPortInt32), }}, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String("/"), diff --git a/pkg/operator/resources/job/batchapi/k8s_specs.go b/pkg/operator/resources/job/batchapi/k8s_specs.go index 7499e891db..5ae8b5aff8 100644 --- a/pkg/operator/resources/job/batchapi/k8s_specs.go +++ b/pkg/operator/resources/job/batchapi/k8s_specs.go @@ -21,6 +21,7 @@ import ( "path" "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" @@ -40,7 +41,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService { Destinations: []k8s.Destination{{ ServiceName: _operatorService, Weight: 100, - Port: uint32(workloads.DefaultPortInt32), + Port: uint32(consts.ProxyListeningPortInt32), }}, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String(path.Join("batch", api.Name)), diff --git a/pkg/operator/resources/job/taskapi/k8s_specs.go b/pkg/operator/resources/job/taskapi/k8s_specs.go index a3c58f9795..8ccd530c85 100644 --- a/pkg/operator/resources/job/taskapi/k8s_specs.go +++ b/pkg/operator/resources/job/taskapi/k8s_specs.go @@ -20,6 +20,7 @@ import ( "path" "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" @@ -42,7 +43,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService { Destinations: []k8s.Destination{{ ServiceName: _operatorService, Weight: 100, - Port: uint32(workloads.DefaultPortInt32), + Port: uint32(consts.ProxyListeningPortInt32), }}, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String(path.Join("tasks", api.Name)), diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 831f43ae32..03dfafc22d 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -17,6 +17,7 @@ limitations under the License. package realtimeapi import ( + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/types/spec" @@ -30,7 +31,7 @@ var _terminationGracePeriodSeconds int64 = 60 // seconds func deploymentSpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { containers, volumes := workloads.UserPodContainers(*api) - // TODO add the proxy as well + containers = append(containers, workloads.RealtimeProxyContainer(*api)) return k8s.Deployment(&k8s.DeploymentSpec{ Name: workloads.K8sName(api.Name), @@ -80,8 +81,8 @@ func serviceSpec(api *spec.API) *kcore.Service { return k8s.Service(&k8s.ServiceSpec{ Name: workloads.K8sName(api.Name), PortName: "http", - Port: workloads.DefaultPortInt32, - TargetPort: workloads.DefaultPortInt32, + Port: consts.ProxyListeningPortInt32, + TargetPort: consts.ProxyListeningPortInt32, Annotations: api.ToK8sAnnotations(), Labels: map[string]string{ "apiName": api.Name, @@ -102,7 +103,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService { Destinations: []k8s.Destination{{ ServiceName: workloads.K8sName(api.Name), Weight: 100, - Port: uint32(workloads.DefaultPortInt32), + Port: uint32(consts.ProxyListeningPortInt32), }}, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String("/"), diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 1ce3844b0d..e2b908d3fd 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -113,7 +113,7 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination destinations[i] = k8s.Destination{ ServiceName: workloads.K8sName(api.Name), Weight: api.Weight, - Port: uint32(consts.ProxyListeningPort), + Port: uint32(consts.ProxyListeningPortInt32), Shadow: api.Shadow, } } diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index 9d628c5c2f..e2a241adc8 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -51,11 +51,12 @@ const ( ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem" - ErrFieldCannotBeEmptyForKind = "spec.field_cannot_be_empty_for_kind" + ErrFieldMustBeSpecifiedForKind = "spec.field_must_be_specified_for_kind" + ErrFieldIsNotSupportedForKind = "spec.field_is_not_supported_for_kind" ErrCortexPrefixedEnvVarNotAllowed = "spec.cortex_prefixed_env_var_not_allowed" + ErrDisallowedEnvVars = "spec.disallowed_env_vars" ErrRegistryInDifferentRegion = "spec.registry_in_different_region" ErrRegistryAccountIDMismatch = "spec.registry_account_id_mismatch" - ErrKeyIsNotSupportedForKind = "spec.key_is_not_supported_for_kind" ErrComputeResourceConflict = "spec.compute_resource_conflict" ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight" @@ -209,10 +210,17 @@ func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error { }) } -func ErrorFieldCannotBeEmptyForKind(field string, kind userconfig.Kind) error { +func ErrorFieldMustBeSpecifiedForKind(field string, kind userconfig.Kind) error { return errors.WithStack(&errors.Error{ - Kind: ErrFieldCannotBeEmptyForKind, - Message: fmt.Sprintf("field %s cannot be empty for %s kind", field, kind.String()), + Kind: ErrFieldMustBeSpecifiedForKind, + Message: fmt.Sprintf("field %s must be specified for %s kind", field, kind.String()), + }) +} + +func ErrorFieldIsNotSupportedForKind(field string, kind userconfig.Kind) error { + return errors.WithStack(&errors.Error{ + Kind: ErrFieldIsNotSupportedForKind, + Message: fmt.Sprintf("%s field is not supported for %s kind", field, kind.String()), }) } @@ -223,6 +231,13 @@ func ErrorCortexPrefixedEnvVarNotAllowed(prefixes ...string) error { }) } +func ErrorDisallowedEnvVars(disallowedValues ...string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrDisallowedEnvVars, + Message: fmt.Sprintf("environment %s %s %s disallowed", s.PluralS("variables", len(disallowedValues)), s.StrsAnd(disallowedValues), s.PluralToBe(len(disallowedValues))), + }) +} + func ErrorRegistryInDifferentRegion(registryRegion string, awsClientRegion string) error { return errors.WithStack(&errors.Error{ Kind: ErrRegistryInDifferentRegion, @@ -237,13 +252,6 @@ func ErrorRegistryAccountIDMismatch(regID, opID string) error { }) } -func ErrorKeyIsNotSupportedForKind(key string, kind userconfig.Kind) error { - return errors.WithStack(&errors.Error{ - Kind: ErrKeyIsNotSupportedForKind, - Message: fmt.Sprintf("%s key is not supported for %s kind", key, kind.String()), - }) -} - func ErrorComputeResourceConflict(resourceA, resourceB string) error { return errors.WithStack(&errors.Error{ Kind: ErrComputeResourceConflict, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index fa687c487e..dc82827a0b 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -148,6 +148,7 @@ func podValidation() *cr.StructFieldValidation { { StructField: "ShmSize", StringPtrValidation: &cr.StringPtrValidation{ + Required: false, Default: nil, AllowExplicitNull: true, }, @@ -165,6 +166,18 @@ func podValidation() *cr.StructFieldValidation { }, }, }, + { + StructField: "Port", + Int32PtrValidation: &cr.Int32PtrValidation{ + Required: false, + Default: nil, + AllowExplicitNull: true, + DisallowedValues: []int32{ + consts.ProxyListeningPortInt32, + consts.MetricsPortInt32, + }, + }, + }, containersValidation(), }, }, @@ -555,6 +568,13 @@ func validatePod( } } + if api.Pod.Port != nil && api.Kind == userconfig.TaskAPIKind { + return ErrorFieldIsNotSupportedForKind(userconfig.PortKey, api.Kind) + } + if api.Pod.Port == nil && api.Kind != userconfig.TaskAPIKind { + api.Pod.Port = pointer.Int32(consts.DefaultUserPodPortInt32) + } + if err := validateCompute(totalCompute); err != nil { return errors.Wrap(err, userconfig.ComputeKey) } @@ -581,7 +601,7 @@ func validateContainers( containerNames = append(containerNames, container.Name) if container.Command == nil && (kind == userconfig.BatchAPIKind || kind == userconfig.TaskAPIKind) { - return errors.Wrap(ErrorFieldCannotBeEmptyForKind(userconfig.CommandKey, kind), strconv.FormatInt(int64(i), 10), userconfig.CommandKey) + return errors.Wrap(ErrorFieldMustBeSpecifiedForKind(userconfig.CommandKey, kind), strconv.FormatInt(int64(i), 10), userconfig.CommandKey) } if err := validateDockerImagePath(container.Image, awsClient, k8sClient); err != nil { @@ -592,6 +612,9 @@ func validateContainers( if strings.HasPrefix(key, "CORTEX_") || strings.HasPrefix(key, "KUBEXIT_") { return errors.Wrap(ErrorCortexPrefixedEnvVarNotAllowed("CORTEX_", "KUBEXIT_"), strconv.FormatInt(int64(i), 10), userconfig.EnvKey, key) } + if key == "HOST_IP" { + return errors.Wrap(ErrorDisallowedEnvVars(key), strconv.FormatInt(int64(i), 10), userconfig.EnvKey, key) + } } } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 156c0c8d03..5d19ca97bb 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -45,6 +45,7 @@ type API struct { type Pod struct { NodeGroups []string `json:"node_groups" yaml:"node_groups"` ShmSize *k8s.Quantity `json:"shm_size" yaml:"shm_size"` + Port *int32 `json:"port" yaml:"port"` Containers []*Container `json:"containers" yaml:"containers"` } @@ -276,6 +277,9 @@ func (pod *Pod) UserStr() string { } else { sb.WriteString(fmt.Sprintf("%s: %s\n", NodeGroupsKey, s.ObjFlatNoQuotes(pod.NodeGroups))) } + if pod.Port != nil { + sb.WriteString(fmt.Sprintf("%s: %d\n", PortKey, *pod.Port)) + } sb.WriteString(fmt.Sprintf("%s:\n", ContainersKey)) for _, container := range pod.Containers { @@ -478,8 +482,11 @@ func (api *API) TelemetryEvent() map[string]interface{} { } event["pod.node_groups._is_defined"] = len(api.Pod.NodeGroups) > 0 event["pod.node_groups._len"] = len(api.Pod.NodeGroups) - event["pod.containers._len"] = len(api.Pod.Containers) + if api.Pod.Port != nil { + event["pod.port"] = *api.Pod.Port + } + event["pod.containers._len"] = len(api.Pod.Containers) totalCompute := GetTotalComputeFromContainers(api.Pod.Containers) event["pod.containers.compute._is_defined"] = true if totalCompute.CPU != nil { diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 598db9f97f..81861f5161 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -34,6 +34,7 @@ const ( PodKey = "pod" NodeGroupsKey = "node_groups" ShmSizeKey = "shm_size" + PortKey = "port" ContainersKey = "containers" // Containers diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index b0f3d6fcf4..cea4d78582 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -33,8 +33,6 @@ import ( ) const ( - DefaultPortInt32 = int32(8888) - DefaultPortStr = "8888" ServiceAccountName = "default" ) @@ -43,6 +41,8 @@ const ( _emptyDirMountPath = "/mnt" _emptyDirVolumeName = "mnt" + _proxyContainerName = "proxy" + _gatewayContainerName = "gateway" _neuronRTDContainerName = "neuron-rtd" @@ -69,13 +69,13 @@ func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.V Image: config.ClusterConfig.ImageAsyncGateway, ImagePullPolicy: kcore.PullAlways, Args: []string{ - "-port", s.Int32(DefaultPortInt32), + "-port", s.Int32(consts.ProxyListeningPortInt32), "-queue", queueURL, "-cluster-config", consts.DefaultInClusterConfigPath, api.Name, }, Ports: []kcore.ContainerPort{ - {ContainerPort: DefaultPortInt32}, + {ContainerPort: consts.ProxyListeningPortInt32}, }, Env: []kcore.EnvVar{ { @@ -178,7 +178,20 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { podHasInf = true } - var containerEnvVars []kcore.EnvVar + containerEnvVars := []kcore.EnvVar{ + { + Name: "HOST_IP", + ValueFrom: &kcore.EnvVarSource{ + FieldRef: &kcore.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, + { + Name: "CORTEX_PORT", + Value: s.Int32(*api.Pod.Port), + }, + } if requiresKubexit { containerDeathDependencies := containerNames.Copy() containerDeathDependencies.Remove(container.Name) @@ -195,14 +208,6 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { Value: v, }) } - containerEnvVars = append(containerEnvVars, kcore.EnvVar{ - Name: "HOST_IP", - ValueFrom: &kcore.EnvVarSource{ - FieldRef: &kcore.ObjectFieldSelector{ - FieldPath: "status.hostIP", - }, - }, - }) var containerCmd []string if requiresKubexit && container.Command[0] != "/mnt/kubexit" { @@ -220,11 +225,6 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { Requests: containerResourceList, Limits: containerResourceLimitsList, }, - Ports: []kcore.ContainerPort{ - { - ContainerPort: int32(8888), - }, - }, ImagePullPolicy: kcore.PullAlways, SecurityContext: &kcore.SecurityContext{ Privileged: pointer.Bool(true), @@ -363,6 +363,36 @@ func neuronRuntimeDaemonContainer(computeInf int64, volumeMounts []kcore.VolumeM } } +func RealtimeProxyContainer(api spec.API) kcore.Container { + return kcore.Container{ + Name: _proxyContainerName, + Image: config.ClusterConfig.ImageProxy, + ImagePullPolicy: kcore.PullAlways, + Args: []string{ + "-port", + consts.ProxyListeningPortStr, + "-metrics-port", + consts.MetricsPortStr, + "-user-port", + s.Int32(*api.Pod.Port), + "-max-concurrency", + "1", + "-max-queue-length", + "1", + }, + Ports: []kcore.ContainerPort{ + {Name: "metrics", ContainerPort: consts.MetricsPortInt32}, + {ContainerPort: consts.ProxyListeningPortInt32}, + }, + Env: []kcore.EnvVar{ + { + Name: "CORTEX_LOG_LEVEL", + Value: strings.ToUpper(userconfig.InfoLogLevel.String()), + }, + }, + } +} + // func getAsyncAPIEnvVars(api spec.API, queueURL string) []kcore.EnvVar { // envVars := apiContainerEnvVars(&api) @@ -379,32 +409,3 @@ func neuronRuntimeDaemonContainer(computeInf int64, volumeMounts []kcore.VolumeM // return envVars // } - -// func RequestMonitorContainer(api *spec.API) kcore.Container { -// requests := kcore.ResourceList{} -// if api.Compute != nil { -// if api.Compute.CPU != nil { -// requests[kcore.ResourceCPU] = _requestMonitorCPURequest -// } -// if api.Compute.Mem != nil { -// requests[kcore.ResourceMemory] = _requestMonitorMemRequest -// } -// } - -// return kcore.Container{ -// Name: _requestMonitorContainerName, -// Image: config.ClusterConfig.ImageRequestMonitor, -// ImagePullPolicy: kcore.PullAlways, -// Args: []string{"-p", DefaultRequestMonitorPortStr}, -// Ports: []kcore.ContainerPort{ -// {Name: "metrics", ContainerPort: DefaultRequestMonitorPortInt32}, -// }, -// Env: requestMonitorEnvVars(api), -// EnvFrom: baseEnvVars(), -// VolumeMounts: defaultVolumeMounts(), -// ReadinessProbe: FileExistsProbe(_requestMonitorReadinessFile), -// Resources: kcore.ResourceRequirements{ -// Requests: requests, -// }, -// } -// } diff --git a/test/apis/realtime/Dockerfile b/test/apis/realtime/Dockerfile index 05845435a4..68cf9cfa29 100644 --- a/test/apis/realtime/Dockerfile +++ b/test/apis/realtime/Dockerfile @@ -17,5 +17,4 @@ RUN pip install Flask gunicorn # webserver, with one worker process and 8 threads. # For environments with multiple CPU cores, increase the number of workers # to be equal to the cores available. -# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. -CMD exec gunicorn --bind :8888 --workers 1 --threads 8 --timeout 0 main:app +CMD exec gunicorn --bind :$CORTEX_PORT --workers 1 --threads 8 --timeout 0 main:app diff --git a/test/apis/realtime/cortex.yaml b/test/apis/realtime/cortex.yaml index 4770d44709..01b404425d 100644 --- a/test/apis/realtime/cortex.yaml +++ b/test/apis/realtime/cortex.yaml @@ -2,6 +2,7 @@ kind: RealtimeAPI pod: node_groups: [cpu] + port: 1234 containers: - name: api image: 499593605069.dkr.ecr.us-west-2.amazonaws.com/sample/realtime-caas:latest diff --git a/test/apis/realtime/main.py b/test/apis/realtime/main.py index 6ea9ba744f..bf2c96dc8b 100644 --- a/test/apis/realtime/main.py +++ b/test/apis/realtime/main.py @@ -12,4 +12,4 @@ def hello_world(): if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=8888) + app.run(debug=True, host="0.0.0.0", port=int(os.getenv("CORTEX_PORT", "8000"))) From 169ae28b435314d738ee3720c76b89a7aee47533 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 16:25:39 +0300 Subject: [PATCH 02/15] Don't show cmd/args when they are null --- pkg/types/userconfig/api.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 5d19ca97bb..b001b2e76f 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -303,15 +303,11 @@ func (container *Container) UserStr() string { sb.WriteString(s.Indent(string(d), " ")) } - if container.Command == nil { - sb.WriteString(fmt.Sprintf("%s: null\n", CommandKey)) - } else { + if container.Command != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", CommandKey, s.ObjFlatNoQuotes(container.Command))) } - if container.Args == nil { - sb.WriteString(fmt.Sprintf("%s: null\n", ArgsKey)) - } else { + if container.Args != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", ArgsKey, s.ObjFlatNoQuotes(container.Args))) } From 0cbca6c664264ca29774cd6039ba2980ae12838b Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 16:25:49 +0300 Subject: [PATCH 03/15] Use default port instead for realtime test --- test/apis/realtime/cortex.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/test/apis/realtime/cortex.yaml b/test/apis/realtime/cortex.yaml index 01b404425d..4770d44709 100644 --- a/test/apis/realtime/cortex.yaml +++ b/test/apis/realtime/cortex.yaml @@ -2,7 +2,6 @@ kind: RealtimeAPI pod: node_groups: [cpu] - port: 1234 containers: - name: api image: 499593605069.dkr.ecr.us-west-2.amazonaws.com/sample/realtime-caas:latest From 98ae35e907a17cce8dc753bdcd139cada804b9be Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 18:19:53 +0300 Subject: [PATCH 04/15] Volume/mount fixes --- pkg/workloads/helpers.go | 75 ++++++++++++++++++++++++++-------------- pkg/workloads/init.go | 16 ++++++--- pkg/workloads/k8s.go | 49 ++++++++++++++++---------- 3 files changed, 92 insertions(+), 48 deletions(-) diff --git a/pkg/workloads/helpers.go b/pkg/workloads/helpers.go index bfb06dcf55..61cb6f4153 100644 --- a/pkg/workloads/helpers.go +++ b/pkg/workloads/helpers.go @@ -23,6 +23,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/k8s" kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" ) func K8sName(apiName string) string { @@ -129,39 +130,63 @@ func getKubexitEnvVars(containerName string, deathDeps []string, birthDeps []str return envVars } -func defaultVolumes(requiresKubexit bool) []kcore.Volume { - volumes := []kcore.Volume{ - k8s.EmptyDirVolume(_emptyDirVolumeName), - { - Name: "client-config", - VolumeSource: kcore.VolumeSource{ - ConfigMap: &kcore.ConfigMapVolumeSource{ - LocalObjectReference: kcore.LocalObjectReference{ - Name: "client-config", - }, +func MntVolume() kcore.Volume { + return k8s.EmptyDirVolume(_emptyDirVolumeName) +} + +func CortexVolume() kcore.Volume { + return k8s.EmptyDirVolume(_cortexDirVolumeName) +} + +func ClientConfigVolume() kcore.Volume { + return kcore.Volume{ + Name: _clientConfigDirVolume, + VolumeSource: kcore.VolumeSource{ + ConfigMap: &kcore.ConfigMapVolumeSource{ + LocalObjectReference: kcore.LocalObjectReference{ + Name: _clientConfigConfigMap, }, }, }, } - - if requiresKubexit { - return append(volumes, k8s.EmptyDirVolume(_kubexitGraveyardName)) - } - return volumes } -func defaultVolumeMounts(requiresKubexit bool) []kcore.VolumeMount { - volumeMounts := []kcore.VolumeMount{ - k8s.EmptyDirVolumeMount(_emptyDirVolumeName, _emptyDirMountPath), - { - Name: "client-config", - MountPath: path.Join(_clientConfigDir, "cli.yaml"), - SubPath: "cli.yaml", +func ShmVolume(q resource.Quantity) kcore.Volume { + return kcore.Volume{ + Name: _shmDirVolumeName, + VolumeSource: kcore.VolumeSource{ + EmptyDir: &kcore.EmptyDirVolumeSource{ + Medium: kcore.StorageMediumMemory, + SizeLimit: k8s.QuantityPtr(q), + }, }, } +} - if requiresKubexit { - return append(volumeMounts, k8s.EmptyDirVolumeMount(_kubexitGraveyardName, _kubexitGraveyardMountPath)) +func KubexitVolume() kcore.Volume { + return k8s.EmptyDirVolume(_kubexitGraveyardName) +} + +func MntMount() kcore.VolumeMount { + return k8s.EmptyDirVolumeMount(_emptyDirVolumeName, _emptyDirMountPath) +} + +func CortexMount() kcore.VolumeMount { + return k8s.EmptyDirVolumeMount(_cortexDirVolumeName, _cortexDirMountPath) +} + +func ClientConfigMount() kcore.VolumeMount { + return kcore.VolumeMount{ + Name: _clientConfigDirVolume, + MountPath: path.Join(_clientConfigDir, "cli.yaml"), + SubPath: "cli.yaml", } - return volumeMounts +} + +func ShmMount() kcore.VolumeMount { + return k8s.EmptyDirVolumeMount(_shmDirVolumeName, _shmDirMountPath) +} + +func KubexitMount() kcore.VolumeMount { + return k8s.EmptyDirVolumeMount(_kubexitGraveyardName, _kubexitGraveyardMountPath) } diff --git a/pkg/workloads/init.go b/pkg/workloads/init.go index ef84d13aba..a5ea4ca0de 100644 --- a/pkg/workloads/init.go +++ b/pkg/workloads/init.go @@ -29,7 +29,7 @@ import ( ) const ( - JobSpecPath = "/mnt/job_spec.json" + JobSpecPath = "/cortex/job_spec.json" ) const ( @@ -43,8 +43,10 @@ func KubexitInitContainer() kcore.Container { Name: _kubexitInitContainerName, Image: config.ClusterConfig.ImageKubexit, ImagePullPolicy: kcore.PullAlways, - Command: []string{"cp", "/bin/kubexit", "/mnt/kubexit"}, - VolumeMounts: defaultVolumeMounts(true), + Command: []string{"cp", "/bin/kubexit", "/cortex/kubexit"}, + VolumeMounts: []kcore.VolumeMount{ + CortexMount(), + }, } } @@ -79,7 +81,9 @@ func TaskInitContainer(job *spec.TaskJob) kcore.Container { Value: strings.ToUpper(userconfig.InfoLogLevel.String()), }, }, - VolumeMounts: defaultVolumeMounts(true), + VolumeMounts: []kcore.VolumeMount{ + CortexMount(), + }, } } @@ -114,6 +118,8 @@ func BatchInitContainer(job *spec.BatchJob) kcore.Container { Value: strings.ToUpper(userconfig.InfoLogLevel.String()), }, }, - VolumeMounts: defaultVolumeMounts(true), + VolumeMounts: []kcore.VolumeMount{ + CortexMount(), + }, } } diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index cea4d78582..dfb415b082 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -37,9 +37,12 @@ const ( ) const ( - _clientConfigDir = "/mnt/client" - _emptyDirMountPath = "/mnt" + _cortexDirVolumeName = "cortex" + _cortexDirMountPath = "/cortex" + _clientConfigDir = "/cortex/client" + _emptyDirVolumeName = "mnt" + _emptyDirMountPath = "/mnt" _proxyContainerName = "proxy" @@ -50,6 +53,12 @@ const ( _kubexitGraveyardName = "graveyard" _kubexitGraveyardMountPath = "/graveyard" + + _shmDirVolumeName = "dshm" + _shmDirMountPath = "/dev/shm" + + _clientConfigDirVolume = "client-config" + _clientConfigConfigMap = "client-config" ) var ( @@ -111,23 +120,25 @@ func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.V func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { requiresKubexit := api.Kind == userconfig.BatchAPIKind || api.Kind == userconfig.TaskAPIKind - volumes := defaultVolumes(requiresKubexit) - containerMounts := []kcore.VolumeMount{} + volumes := []kcore.Volume{ + MntVolume(), + CortexVolume(), + ClientConfigVolume(), + } + containerMounts := []kcore.VolumeMount{ + MntMount(), + CortexMount(), + ClientConfigMount(), + } + + if requiresKubexit { + volumes = append(volumes, KubexitVolume()) + containerMounts = append(containerMounts, KubexitMount()) + } if api.Pod.ShmSize != nil { - volumes = append(volumes, kcore.Volume{ - Name: "dshm", - VolumeSource: kcore.VolumeSource{ - EmptyDir: &kcore.EmptyDirVolumeSource{ - Medium: kcore.StorageMediumMemory, - SizeLimit: k8s.QuantityPtr(api.Pod.ShmSize.Quantity), - }, - }, - }) - containerMounts = append(containerMounts, kcore.VolumeMount{ - Name: "dshm", - MountPath: "/dev/shm", - }) + volumes = append(volumes, ShmVolume(api.Pod.ShmSize.Quantity)) + containerMounts = append(containerMounts, ShmMount()) } var containers []kcore.Container @@ -150,7 +161,8 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { containerResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(container.Compute.GPU, kresource.DecimalSI) } - containerVolumeMounts := append(defaultVolumeMounts(requiresKubexit), containerMounts...) + containerVolumeMounts := containerMounts + if container.Compute.Inf > 0 { volumes = append(volumes, kcore.Volume{ Name: "neuron-sock", @@ -390,6 +402,7 @@ func RealtimeProxyContainer(api spec.API) kcore.Container { Value: strings.ToUpper(userconfig.InfoLogLevel.String()), }, }, + EnvFrom: baseClusterEnvVars(), } } From 2c8d9aabacbee94c89df99336b352944bd3adc4a Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 21:51:17 +0300 Subject: [PATCH 05/15] Autoscaling, Prometheus changes, refresh fix --- dev/registry.sh | 6 ++-- .../manifests/prometheus-monitoring.yaml.j2 | 10 +++--- pkg/consts/consts.go | 7 ++-- pkg/operator/lib/autoscaler/autoscaler.go | 4 +-- pkg/operator/resources/realtimeapi/api.go | 8 +++-- pkg/types/spec/errors.go | 8 +++++ pkg/types/spec/validations.go | 16 ++++----- pkg/types/userconfig/api.go | 36 +++++++++---------- pkg/types/userconfig/config_key.go | 12 +++---- pkg/workloads/k8s.go | 4 +-- test/apis/realtime/Dockerfile | 2 +- test/apis/realtime/cortex.yaml | 7 +++- test/apis/realtime/main.py | 8 +++-- .../image-classifier-resnet50/cortex_inf.yaml | 2 +- .../cortex_inf_server_side_batching.yaml | 2 +- 15 files changed, 77 insertions(+), 55 deletions(-) diff --git a/dev/registry.sh b/dev/registry.sh index eda792d95c..3b05e85bb3 100755 --- a/dev/registry.sh +++ b/dev/registry.sh @@ -222,7 +222,7 @@ elif [ "$cmd" = "create" ]; then # usage: registry.sh update-single IMAGE elif [ "$cmd" = "update-single" ]; then image=$sub_cmd - if [ "$image" = "operator" ] || [ "$image" = "request-monitor" ]; then + if [ "$image" = "operator" ] || [ "$image" = "proxy" ]; then cache_builder $image fi build_and_push $image @@ -245,8 +245,8 @@ elif [ "$cmd" = "update" ]; then if [[ " ${images_to_build[@]} " =~ " operator " ]]; then cache_builder operator fi - if [[ " ${images_to_build[@]} " =~ " request-monitor " ]]; then - cache_builder request-monitor + if [[ " ${images_to_build[@]} " =~ " proxy " ]]; then + cache_builder proxy fi if [[ " ${images_to_build[@]} " =~ " async-gateway " ]]; then cache_builder async-gateway diff --git a/manager/manifests/prometheus-monitoring.yaml.j2 b/manager/manifests/prometheus-monitoring.yaml.j2 index a56f7664e3..8b2982dd9f 100644 --- a/manager/manifests/prometheus-monitoring.yaml.j2 +++ b/manager/manifests/prometheus-monitoring.yaml.j2 @@ -34,7 +34,7 @@ spec: matchExpressions: - key: "monitoring.cortex.dev" operator: "In" - values: [ "istio", "request-monitor", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ] + values: [ "istio", "proxy-metrics-monitor", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ] serviceMonitorSelector: matchExpressions: - key: "monitoring.cortex.dev" @@ -168,9 +168,9 @@ spec: apiVersion: monitoring.coreos.com/v1 kind: PodMonitor metadata: - name: request-monitor-stats + name: proxy-metrics-monitor-stats labels: - monitoring.cortex.dev: "request-monitor" + monitoring.cortex.dev: "proxy-metrics-monitor" spec: selector: matchLabels: @@ -179,7 +179,7 @@ spec: - { key: prometheus-ignore, operator: DoesNotExist } namespaceSelector: any: true - jobLabel: request-monitor-stats + jobLabel: proxy-metrics-monitor-stats podMetricsEndpoints: - path: /metrics scheme: http @@ -188,7 +188,7 @@ spec: relabelings: - action: keep sourceLabels: [ __meta_kubernetes_pod_container_name ] - regex: "request-monitor" + regex: "proxy" - sourceLabels: [ __meta_kubernetes_pod_label_apiName ] action: replace targetLabel: api_name diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 6286c7a827..57d575a309 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -24,9 +24,9 @@ var ( CortexVersion = "master" // CORTEX_VERSION CortexVersionMinor = "master" // CORTEX_VERSION_MINOR - DefaultMaxReplicaQueueLength = int64(1024) - DefaultMaxReplicaConcurrency = int64(1024) - DefaultTargetReplicaConcurrency = float64(8) + DefaultMaxQueueLength = int64(1024) + DefaultMaxConcurrency = int64(16) + DefaultTargetInFlight = float64(16) DefaultUserPodPortStr = "8080" DefaultUserPodPortInt32 = int32(8080) @@ -44,6 +44,7 @@ var ( AsyncWorkloadsExpirationDays = int64(7) ReservedContainerNames = []string{ + "proxy", "neuron-rtd", } ) diff --git a/pkg/operator/lib/autoscaler/autoscaler.go b/pkg/operator/lib/autoscaler/autoscaler.go index 0fc7c7197d..79ba7ac385 100644 --- a/pkg/operator/lib/autoscaler/autoscaler.go +++ b/pkg/operator/lib/autoscaler/autoscaler.go @@ -133,7 +133,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl return nil } - rawRecommendation := *avgInFlight / autoscalingSpec.TargetReplicaConcurrency + rawRecommendation := *avgInFlight / autoscalingSpec.TargetInFlight recommendation := int32(math.Ceil(rawRecommendation)) if rawRecommendation < float64(currentReplicas) && rawRecommendation > float64(currentReplicas)*(1-autoscalingSpec.DownscaleTolerance) { @@ -199,7 +199,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl apiLogger.Debugw(fmt.Sprintf("%s autoscaler tick", apiName), "autoscaling", map[string]interface{}{ "avg_in_flight": *avgInFlight, - "target_replica_concurrency": autoscalingSpec.TargetReplicaConcurrency, + "target_in_flight": autoscalingSpec.TargetInFlight, "raw_recommendation": rawRecommendation, "current_replicas": currentReplicas, "downscale_tolerance": autoscalingSpec.DownscaleTolerance, diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 7f01aa957c..0cc58c8dd1 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -116,7 +116,11 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A } func RefreshAPI(apiName string, force bool) (string, error) { - prevDeployment, err := config.K8s.GetDeployment(workloads.K8sName(apiName)) + prevDeployment, prevService, prevVirtualService, err := getK8sResources(&userconfig.API{ + Resource: userconfig.Resource{ + Name: apiName, + }, + }) if err != nil { return "", err } else if prevDeployment == nil { @@ -153,7 +157,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { return "", errors.Wrap(err, "upload handler spec") } - if err := applyK8sDeployment(api, prevDeployment); err != nil { + if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { return "", err } diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index e2a241adc8..a4b0cddeac 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -45,6 +45,7 @@ const ( ErrMinReplicasGreaterThanMax = "spec.min_replicas_greater_than_max" ErrInitReplicasGreaterThanMax = "spec.init_replicas_greater_than_max" ErrInitReplicasLessThanMin = "spec.init_replicas_less_than_min" + ErrTargetInFlightLimitReached = "spec.target_in_flight_limit_reached" ErrInvalidSurgeOrUnavailable = "spec.invalid_surge_or_unavailable" ErrSurgeAndUnavailableBothZero = "spec.surge_and_unavailable_both_zero" @@ -189,6 +190,13 @@ func ErrorInitReplicasLessThanMin(init int32, min int32) error { }) } +func ErrorTargetInFlightLimitReached(targetInFlight float64, maxConcurrency, maxQueueLength int64) error { + return errors.WithStack(&errors.Error{ + Kind: ErrTargetInFlightLimitReached, + Message: fmt.Sprintf("%s cannot be greater than %s + %s (%f > %d + %d)", userconfig.TargetInFlightKey, userconfig.MaxConcurrencyKey, userconfig.MaxQueueLengthKey, targetInFlight, maxConcurrency, maxQueueLength), + }) +} + func ErrorInvalidSurgeOrUnavailable(val string) error { return errors.WithStack(&errors.Error{ Kind: ErrInvalidSurgeOrUnavailable, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index dc82827a0b..afb301d055 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -331,9 +331,9 @@ func autoscalingValidation() *cr.StructFieldValidation { }, }, { - StructField: "MaxReplicaQueueLength", + StructField: "MaxQueueLength", Int64Validation: &cr.Int64Validation{ - Default: consts.DefaultMaxReplicaQueueLength, + Default: consts.DefaultMaxQueueLength, GreaterThan: pointer.Int64(0), // our configured nginx can theoretically accept up to 32768 connections, but during testing, // it has been observed that the number is just slightly lower, so it has been offset by 2678 @@ -341,9 +341,9 @@ func autoscalingValidation() *cr.StructFieldValidation { }, }, { - StructField: "MaxReplicaConcurrency", + StructField: "MaxConcurrency", Int64Validation: &cr.Int64Validation{ - Default: consts.DefaultMaxReplicaConcurrency, + Default: consts.DefaultMaxConcurrency, GreaterThan: pointer.Int64(0), // our configured nginx can theoretically accept up to 32768 connections, but during testing, // it has been observed that the number is just slightly lower, so it has been offset by 2678 @@ -351,9 +351,9 @@ func autoscalingValidation() *cr.StructFieldValidation { }, }, { - StructField: "TargetReplicaConcurrency", + StructField: "TargetInFlight", Float64Validation: &cr.Float64Validation{ - Default: consts.DefaultTargetReplicaConcurrency, + Default: consts.DefaultTargetInFlight, GreaterThan: pointer.Float64(0), }, }, @@ -624,8 +624,8 @@ func validateContainers( func validateAutoscaling(api *userconfig.API) error { autoscaling := api.Autoscaling - if autoscaling.TargetReplicaConcurrency > float64(autoscaling.MaxReplicaConcurrency) { - return ErrorConfigGreaterThanOtherConfig(userconfig.TargetReplicaConcurrencyKey, autoscaling.TargetReplicaConcurrency, userconfig.MaxReplicaConcurrencyKey, autoscaling.MaxReplicaConcurrency) + if autoscaling.TargetInFlight > float64(autoscaling.MaxConcurrency)+float64(autoscaling.MaxQueueLength) { + return ErrorTargetInFlightLimitReached(autoscaling.TargetInFlight, autoscaling.MaxConcurrency, autoscaling.MaxQueueLength) } if autoscaling.MinReplicas > autoscaling.MaxReplicas { diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index b001b2e76f..a5f4a6c89a 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -81,9 +81,9 @@ type Autoscaling struct { MinReplicas int32 `json:"min_replicas" yaml:"min_replicas"` MaxReplicas int32 `json:"max_replicas" yaml:"max_replicas"` InitReplicas int32 `json:"init_replicas" yaml:"init_replicas"` - MaxReplicaQueueLength int64 `json:"max_replica_queue_length" yaml:"max_replica_queue_length"` - MaxReplicaConcurrency int64 `json:"max_replica_concurrency" yaml:"max_replica_concurrency"` - TargetReplicaConcurrency float64 `json:"target_replica_concurrency" yaml:"target_replica_concurrency"` + MaxQueueLength int64 `json:"max_queue_length" yaml:"max_queue_length"` + MaxConcurrency int64 `json:"max_concurrency" yaml:"max_concurrency"` + TargetInFlight float64 `json:"target_in_flight" yaml:"target_in_flight"` Window time.Duration `json:"window" yaml:"window"` DownscaleStabilizationPeriod time.Duration `json:"downscale_stabilization_period" yaml:"downscale_stabilization_period"` UpscaleStabilizationPeriod time.Duration `json:"upscale_stabilization_period" yaml:"upscale_stabilization_period"` @@ -131,9 +131,9 @@ func (api *API) ToK8sAnnotations() map[string]string { if api.Autoscaling != nil { annotations[MinReplicasAnnotationKey] = s.Int32(api.Autoscaling.MinReplicas) annotations[MaxReplicasAnnotationKey] = s.Int32(api.Autoscaling.MaxReplicas) - annotations[MaxReplicaQueueLengthAnnotationKey] = s.Int64(api.Autoscaling.MaxReplicaQueueLength) - annotations[TargetReplicaConcurrencyAnnotationKey] = s.Float64(api.Autoscaling.TargetReplicaConcurrency) - annotations[MaxReplicaConcurrencyAnnotationKey] = s.Int64(api.Autoscaling.MaxReplicaConcurrency) + annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Autoscaling.MaxQueueLength) + annotations[TargetInFlightAnnotationKey] = s.Float64(api.Autoscaling.TargetInFlight) + annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Autoscaling.MaxConcurrency) annotations[WindowAnnotationKey] = api.Autoscaling.Window.String() annotations[DownscaleStabilizationPeriodAnnotationKey] = api.Autoscaling.DownscaleStabilizationPeriod.String() annotations[UpscaleStabilizationPeriodAnnotationKey] = api.Autoscaling.UpscaleStabilizationPeriod.String() @@ -160,23 +160,23 @@ func AutoscalingFromAnnotations(k8sObj kmeta.Object) (*Autoscaling, error) { } a.MaxReplicas = maxReplicas - maxReplicaQueueLength, err := k8s.ParseInt64Annotation(k8sObj, MaxReplicaQueueLengthAnnotationKey) + maxQueueLength, err := k8s.ParseInt64Annotation(k8sObj, MaxQueueLengthAnnotationKey) if err != nil { return nil, err } - a.MaxReplicaQueueLength = maxReplicaQueueLength + a.MaxQueueLength = maxQueueLength - maxReplicaConcurrency, err := k8s.ParseInt64Annotation(k8sObj, MaxReplicaConcurrencyAnnotationKey) + maxConcurrency, err := k8s.ParseInt64Annotation(k8sObj, MaxConcurrencyAnnotationKey) if err != nil { return nil, err } - a.MaxReplicaConcurrency = maxReplicaConcurrency + a.MaxConcurrency = maxConcurrency - targetReplicaConcurrency, err := k8s.ParseFloat64Annotation(k8sObj, TargetReplicaConcurrencyAnnotationKey) + targetInFlight, err := k8s.ParseFloat64Annotation(k8sObj, TargetInFlightAnnotationKey) if err != nil { return nil, err } - a.TargetReplicaConcurrency = targetReplicaConcurrency + a.TargetInFlight = targetInFlight window, err := k8s.ParseDurationAnnotation(k8sObj, WindowAnnotationKey) if err != nil { @@ -381,9 +381,9 @@ func (autoscaling *Autoscaling) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", MinReplicasKey, s.Int32(autoscaling.MinReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicasKey, s.Int32(autoscaling.MaxReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", InitReplicasKey, s.Int32(autoscaling.InitReplicas))) - sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicaQueueLengthKey, s.Int64(autoscaling.MaxReplicaQueueLength))) - sb.WriteString(fmt.Sprintf("%s: %s\n", MaxReplicaConcurrencyKey, s.Int64(autoscaling.MaxReplicaConcurrency))) - sb.WriteString(fmt.Sprintf("%s: %s\n", TargetReplicaConcurrencyKey, s.Float64(autoscaling.TargetReplicaConcurrency))) + sb.WriteString(fmt.Sprintf("%s: %s\n", MaxQueueLengthKey, s.Int64(autoscaling.MaxQueueLength))) + sb.WriteString(fmt.Sprintf("%s: %s\n", MaxConcurrencyKey, s.Int64(autoscaling.MaxConcurrency))) + sb.WriteString(fmt.Sprintf("%s: %s\n", TargetInFlightKey, s.Float64(autoscaling.TargetInFlight))) sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleStabilizationPeriodKey, autoscaling.DownscaleStabilizationPeriod.String())) sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleStabilizationPeriodKey, autoscaling.UpscaleStabilizationPeriod.String())) @@ -508,9 +508,9 @@ func (api *API) TelemetryEvent() map[string]interface{} { event["autoscaling.min_replicas"] = api.Autoscaling.MinReplicas event["autoscaling.max_replicas"] = api.Autoscaling.MaxReplicas event["autoscaling.init_replicas"] = api.Autoscaling.InitReplicas - event["autoscaling.max_replica_queue_length"] = api.Autoscaling.MaxReplicaQueueLength - event["autoscaling.max_replica_concurrency"] = api.Autoscaling.MaxReplicaConcurrency - event["autoscaling.target_replica_concurrency"] = api.Autoscaling.TargetReplicaConcurrency + event["autoscaling.max_queue_length"] = api.Autoscaling.MaxQueueLength + event["autoscaling.max_concurrency"] = api.Autoscaling.MaxConcurrency + event["autoscaling.target_in_flight"] = api.Autoscaling.TargetInFlight event["autoscaling.window"] = api.Autoscaling.Window.Seconds() event["autoscaling.downscale_stabilization_period"] = api.Autoscaling.DownscaleStabilizationPeriod.Seconds() event["autoscaling.upscale_stabilization_period"] = api.Autoscaling.UpscaleStabilizationPeriod.Seconds() diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 81861f5161..63bbd1bc7f 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -57,9 +57,9 @@ const ( MinReplicasKey = "min_replicas" MaxReplicasKey = "max_replicas" InitReplicasKey = "init_replicas" - MaxReplicaQueueLengthKey = "max_replica_queue_length" - MaxReplicaConcurrencyKey = "max_replica_concurrency" - TargetReplicaConcurrencyKey = "target_replica_concurrency" + MaxQueueLengthKey = "max_queue_length" + MaxConcurrencyKey = "max_concurrency" + TargetInFlightKey = "target_in_flight" WindowKey = "window" DownscaleStabilizationPeriodKey = "downscale_stabilization_period" UpscaleStabilizationPeriodKey = "upscale_stabilization_period" @@ -76,9 +76,9 @@ const ( EndpointAnnotationKey = "networking.cortex.dev/endpoint" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" - MaxReplicaQueueLengthAnnotationKey = "autoscaling.cortex.dev/max-replica-queue-length" - MaxReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" - TargetReplicaConcurrencyAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" + MaxQueueLengthAnnotationKey = "autoscaling.cortex.dev/max-replica-queue-length" + MaxConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" + TargetInFlightAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" WindowAnnotationKey = "autoscaling.cortex.dev/window" DownscaleStabilizationPeriodAnnotationKey = "autoscaling.cortex.dev/downscale-stabilization-period" UpscaleStabilizationPeriodAnnotationKey = "autoscaling.cortex.dev/upscale-stabilization-period" diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index dfb415b082..af3cd97387 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -388,9 +388,9 @@ func RealtimeProxyContainer(api spec.API) kcore.Container { "-user-port", s.Int32(*api.Pod.Port), "-max-concurrency", - "1", + s.Int32(int32(api.Autoscaling.MaxConcurrency)), "-max-queue-length", - "1", + s.Int32(int32(api.Autoscaling.MaxQueueLength)), }, Ports: []kcore.ContainerPort{ {Name: "metrics", ContainerPort: consts.MetricsPortInt32}, diff --git a/test/apis/realtime/Dockerfile b/test/apis/realtime/Dockerfile index 68cf9cfa29..2955a09120 100644 --- a/test/apis/realtime/Dockerfile +++ b/test/apis/realtime/Dockerfile @@ -17,4 +17,4 @@ RUN pip install Flask gunicorn # webserver, with one worker process and 8 threads. # For environments with multiple CPU cores, increase the number of workers # to be equal to the cores available. -CMD exec gunicorn --bind :$CORTEX_PORT --workers 1 --threads 8 --timeout 0 main:app +CMD exec gunicorn --bind :$CORTEX_PORT --workers 1 --threads $NUM_THREADS --timeout 0 main:app diff --git a/test/apis/realtime/cortex.yaml b/test/apis/realtime/cortex.yaml index 4770d44709..9f4e231cea 100644 --- a/test/apis/realtime/cortex.yaml +++ b/test/apis/realtime/cortex.yaml @@ -1,10 +1,15 @@ - name: realtime kind: RealtimeAPI pod: - node_groups: [cpu] containers: - name: api image: 499593605069.dkr.ecr.us-west-2.amazonaws.com/sample/realtime-caas:latest + env: + NUM_THREADS: "8" compute: cpu: 200m mem: 512Mi + autoscaling: + max_queue_length: 16 + max_concurrency: 8 + target_in_flight: 10 diff --git a/test/apis/realtime/main.py b/test/apis/realtime/main.py index bf2c96dc8b..a164b76d2f 100644 --- a/test/apis/realtime/main.py +++ b/test/apis/realtime/main.py @@ -1,4 +1,6 @@ import os +import time +import threading as td from flask import Flask @@ -7,8 +9,10 @@ @app.route("/") def hello_world(): - name = os.environ.get("NAME", "World") - return "Hello {}!".format(name) + time.sleep(1) + msg = f"Hello World! (TID={td.get_ident()}" + print(msg) + return msg if __name__ == "__main__": diff --git a/test/apis/tensorflow/image-classifier-resnet50/cortex_inf.yaml b/test/apis/tensorflow/image-classifier-resnet50/cortex_inf.yaml index 77d9adde9f..5ba87a891d 100644 --- a/test/apis/tensorflow/image-classifier-resnet50/cortex_inf.yaml +++ b/test/apis/tensorflow/image-classifier-resnet50/cortex_inf.yaml @@ -17,4 +17,4 @@ cpu: 3 mem: 4G autoscaling: - max_replica_concurrency: 16384 + max_concurrency: 16384 diff --git a/test/apis/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml b/test/apis/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml index 615a2ab84f..9d587cecfb 100644 --- a/test/apis/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml +++ b/test/apis/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml @@ -20,4 +20,4 @@ cpu: 3 mem: 4G autoscaling: - max_replica_concurrency: 16384 + max_concurrency: 16384 From 1014344669863ee8c30d2a00829c738d669f0362 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 23:09:49 +0300 Subject: [PATCH 06/15] Add sentry reporting to proxy --- cmd/proxy/main.go | 71 +++++++++++++++++-- .../resources/realtimeapi/k8s_specs.go | 5 +- pkg/workloads/helpers.go | 21 ++++++ pkg/workloads/k8s.go | 13 +++- 4 files changed, 102 insertions(+), 8 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 5ebb766f91..b0950330af 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -25,9 +25,13 @@ import ( "strconv" "time" + "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/logging" + "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/proxy" - "go.uber.org/zap" + "github.com/cortexlabs/cortex/pkg/types/clusterconfig" + "github.com/cortexlabs/cortex/pkg/types/userconfig" ) const ( @@ -35,6 +39,28 @@ const ( _requestSampleInterval = 1 * time.Second ) +var ( + proxyLogger = logging.GetLogger() +) + +func Exit(err error, wrapStrs ...string) { + for _, str := range wrapStrs { + err = errors.Wrap(err, str) + } + + if err != nil && !errors.IsNoTelemetry(err) { + telemetry.Error(err) + } + + if err != nil && !errors.IsNoPrint(err) { + proxyLogger.Error(err) + } + + telemetry.Close() + + os.Exit(1) +} + func main() { var ( port int @@ -42,6 +68,7 @@ func main() { userContainerPort int maxConcurrency int maxQueueLength int + clusterConfigPath string ) flag.IntVar(&port, "port", 8888, "port where the proxy is served") @@ -49,6 +76,8 @@ func main() { flag.IntVar(&userContainerPort, "user-port", 8080, "port where the proxy redirects to the traffic to") flag.IntVar(&maxConcurrency, "max-concurrency", 0, "max concurrency allowed for user container") flag.IntVar(&maxQueueLength, "max-queue-length", 0, "max request queue length for user container") + flag.StringVar(&clusterConfigPath, "cluster-config", "", "cluster config path") + flag.Parse() log := logging.GetLogger() @@ -58,9 +87,41 @@ func main() { switch { case maxConcurrency == 0: - log.Fatal("--max-concurrency flag is required") + log.Fatal("-max-concurrency flag is required") case maxQueueLength == 0: - maxQueueLength = maxConcurrency * 10 + log.Fatal("-max-queue-length flag is required") + case clusterConfigPath == "": + log.Fatal("-cluster-config flag is required") + } + + clusterConfig, err := clusterconfig.NewForFile(clusterConfigPath) + if err != nil { + Exit(err) + } + + awsClient, err := aws.NewForRegion(clusterConfig.Region) + if err != nil { + Exit(err) + } + + _, userID, err := awsClient.CheckCredentials() + if err != nil { + Exit(err) + } + + err = telemetry.Init(telemetry.Config{ + Enabled: clusterConfig.Telemetry, + UserID: userID, + Properties: map[string]string{ + "kind": userconfig.RealtimeAPIKind.String(), + "image_type": "proxy", + }, + Environment: "api", + LogErrors: true, + BackoffMode: telemetry.BackoffDuplicateMessages, + }) + if err != nil { + Exit(err) } target := "http://127.0.0.1:" + strconv.Itoa(userContainerPort) @@ -123,7 +184,7 @@ func main() { select { case err := <-errCh: - log.Fatal("failed to start proxy server", zap.Error(err)) + Exit(errors.Wrap(err, "failed to start proxy server")) case <-sigint: // We received an interrupt signal, shut down. log.Info("Received TERM signal, handling a graceful shutdown...") @@ -132,7 +193,7 @@ func main() { log.Infof("Shutting down %s server", name) if err := server.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: - log.Warn("HTTP server Shutdown Error", zap.Error(err)) + Exit(errors.Wrap(err, "HTTP server Shutdown Error")) } } log.Info("Shutdown complete, exiting...") diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go index 03dfafc22d..49368f7db2 100644 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ b/pkg/operator/resources/realtimeapi/k8s_specs.go @@ -31,7 +31,10 @@ var _terminationGracePeriodSeconds int64 = 60 // seconds func deploymentSpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { containers, volumes := workloads.UserPodContainers(*api) - containers = append(containers, workloads.RealtimeProxyContainer(*api)) + proxyContainer, proxyVolume := workloads.RealtimeProxyContainer(*api) + + containers = append(containers, proxyContainer) + volumes = append(volumes, proxyVolume) return k8s.Deployment(&k8s.DeploymentSpec{ Name: workloads.K8sName(api.Name), diff --git a/pkg/workloads/helpers.go b/pkg/workloads/helpers.go index 61cb6f4153..ca13be7f72 100644 --- a/pkg/workloads/helpers.go +++ b/pkg/workloads/helpers.go @@ -151,6 +151,19 @@ func ClientConfigVolume() kcore.Volume { } } +func ClusterConfigVolume() kcore.Volume { + return kcore.Volume{ + Name: _clusterConfigDirVolume, + VolumeSource: kcore.VolumeSource{ + ConfigMap: &kcore.ConfigMapVolumeSource{ + LocalObjectReference: kcore.LocalObjectReference{ + Name: _clusterConfigConfigMap, + }, + }, + }, + } +} + func ShmVolume(q resource.Quantity) kcore.Volume { return kcore.Volume{ Name: _shmDirVolumeName, @@ -183,6 +196,14 @@ func ClientConfigMount() kcore.VolumeMount { } } +func ClusterConfigMount() kcore.VolumeMount { + return kcore.VolumeMount{ + Name: _clusterConfigDirVolume, + MountPath: path.Join(_clusterConfigDir, "cluster.yaml"), + SubPath: "cluster.yaml", + } +} + func ShmMount() kcore.VolumeMount { return k8s.EmptyDirVolumeMount(_shmDirVolumeName, _shmDirMountPath) } diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index af3cd97387..39025d1282 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -59,6 +59,10 @@ const ( _clientConfigDirVolume = "client-config" _clientConfigConfigMap = "client-config" + + _clusterConfigDirVolume = "cluster-config" + _clusterConfigConfigMap = "cluster-config" + _clusterConfigDir = "/configs/cluster" ) var ( @@ -375,7 +379,7 @@ func neuronRuntimeDaemonContainer(computeInf int64, volumeMounts []kcore.VolumeM } } -func RealtimeProxyContainer(api spec.API) kcore.Container { +func RealtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { return kcore.Container{ Name: _proxyContainerName, Image: config.ClusterConfig.ImageProxy, @@ -391,6 +395,8 @@ func RealtimeProxyContainer(api spec.API) kcore.Container { s.Int32(int32(api.Autoscaling.MaxConcurrency)), "-max-queue-length", s.Int32(int32(api.Autoscaling.MaxQueueLength)), + "-cluster-config", + consts.DefaultInClusterConfigPath, }, Ports: []kcore.ContainerPort{ {Name: "metrics", ContainerPort: consts.MetricsPortInt32}, @@ -403,7 +409,10 @@ func RealtimeProxyContainer(api spec.API) kcore.Container { }, }, EnvFrom: baseClusterEnvVars(), - } + VolumeMounts: []kcore.VolumeMount{ + ClusterConfigMount(), + }, + }, ClusterConfigVolume() } // func getAsyncAPIEnvVars(api spec.API, queueURL string) []kcore.EnvVar { From 7d7e464b204cb7db16988f71d96ce4e238b61cfe Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 23:22:28 +0300 Subject: [PATCH 07/15] Fix batch test --- pkg/crds/controllers/batch/batchjob_controller_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/crds/controllers/batch/batchjob_controller_test.go b/pkg/crds/controllers/batch/batchjob_controller_test.go index 112067ea3a..005691cb3e 100644 --- a/pkg/crds/controllers/batch/batchjob_controller_test.go +++ b/pkg/crds/controllers/batch/batchjob_controller_test.go @@ -22,6 +22,7 @@ import ( "time" batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/random" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/status" @@ -43,7 +44,7 @@ func uploadTestAPISpec(apiName string, apiID string) error { Kind: userconfig.BatchAPIKind, }, Pod: &userconfig.Pod{ - // TODO use a real image + Port: pointer.Int32(8080), Containers: []*userconfig.Container{ { Name: "api", From f80ba842dd7525abfa7fbb078bb4a10664919f90 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Tue, 18 May 2021 23:56:02 +0300 Subject: [PATCH 08/15] Change pod monitor labels --- manager/manifests/prometheus-monitoring.yaml.j2 | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/manager/manifests/prometheus-monitoring.yaml.j2 b/manager/manifests/prometheus-monitoring.yaml.j2 index 8b2982dd9f..cee1eae033 100644 --- a/manager/manifests/prometheus-monitoring.yaml.j2 +++ b/manager/manifests/prometheus-monitoring.yaml.j2 @@ -34,7 +34,7 @@ spec: matchExpressions: - key: "monitoring.cortex.dev" operator: "In" - values: [ "istio", "proxy-metrics-monitor", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ] + values: [ "istio", "proxy", "statsd-exporter", "dcgm-exporter", "kube-state-metrics" ] serviceMonitorSelector: matchExpressions: - key: "monitoring.cortex.dev" @@ -168,9 +168,9 @@ spec: apiVersion: monitoring.coreos.com/v1 kind: PodMonitor metadata: - name: proxy-metrics-monitor-stats + name: proxy-stats labels: - monitoring.cortex.dev: "proxy-metrics-monitor" + monitoring.cortex.dev: "proxy" spec: selector: matchLabels: @@ -179,7 +179,7 @@ spec: - { key: prometheus-ignore, operator: DoesNotExist } namespaceSelector: any: true - jobLabel: proxy-metrics-monitor-stats + jobLabel: proxy-stats podMetricsEndpoints: - path: /metrics scheme: http From cb42a3b432c8bfe2aed1b6a4a09b11995090b21f Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 00:01:04 +0300 Subject: [PATCH 09/15] Rename PluralToBe -> PluralIs --- pkg/lib/strings/operations.go | 8 ++++---- pkg/types/spec/errors.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/lib/strings/operations.go b/pkg/lib/strings/operations.go index 09700173e1..f97cc42558 100644 --- a/pkg/lib/strings/operations.go +++ b/pkg/lib/strings/operations.go @@ -220,6 +220,10 @@ func PluralEs(str string, count interface{}) string { return PluralCustom(str, str+"es", count) } +func PluralIs(count interface{}) string { + return PluralCustom("is", "are", count) +} + func PluralCustom(singular string, plural string, count interface{}) string { countInt, _ := cast.InterfaceToInt64(count) if countInt == 1 { @@ -228,10 +232,6 @@ func PluralCustom(singular string, plural string, count interface{}) string { return plural } -func PluralToBe(count interface{}) string { - return PluralCustom("is", "are", count) -} - // RemoveDuplicates returns a filtered string slice without repeated entries. // The ignoreRegex parameter can optionally be used to ignore repeated patterns in each slice entry. func RemoveDuplicates(strs []string, ignoreRegex *regexp.Regexp) []string { diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index a4b0cddeac..da75366321 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -242,7 +242,7 @@ func ErrorCortexPrefixedEnvVarNotAllowed(prefixes ...string) error { func ErrorDisallowedEnvVars(disallowedValues ...string) error { return errors.WithStack(&errors.Error{ Kind: ErrDisallowedEnvVars, - Message: fmt.Sprintf("environment %s %s %s disallowed", s.PluralS("variables", len(disallowedValues)), s.StrsAnd(disallowedValues), s.PluralToBe(len(disallowedValues))), + Message: fmt.Sprintf("environment %s %s %s disallowed", s.PluralS("variables", len(disallowedValues)), s.StrsAnd(disallowedValues), s.PluralIs(len(disallowedValues))), }) } From 0f93da3f5ca126f9b9136bcd95961539cb89868e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 00:17:03 +0300 Subject: [PATCH 10/15] Simplifying errors --- pkg/types/spec/errors.go | 36 ++++++++++-------------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index da75366321..3b44ac5fa0 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -35,9 +35,9 @@ const ( ErrDuplicateEndpointInOneDeploy = "spec.duplicate_endpoint_in_one_deploy" ErrDuplicateEndpoint = "spec.duplicate_endpoint" ErrDuplicateContainerName = "spec.duplicate_container_name" - ErrConflictingFields = "spec.conflicting_fields" + ErrCantSpecifyBoth = "spec.cant_specify_both" ErrSpecifyOnlyOneField = "spec.specify_only_one_field" - ErrSpecifyOneOrTheOther = "spec.specify_one_or_the_other" + ErrNoneSpecified = "spec.none_specified" ErrSpecifyAllOrNone = "spec.specify_all_or_none" ErrOneOfPrerequisitesNotDefined = "spec.one_of_prerequisites_not_defined" ErrConfigGreaterThanOtherConfig = "spec.config_greater_than_other_config" @@ -56,8 +56,6 @@ const ( ErrFieldIsNotSupportedForKind = "spec.field_is_not_supported_for_kind" ErrCortexPrefixedEnvVarNotAllowed = "spec.cortex_prefixed_env_var_not_allowed" ErrDisallowedEnvVars = "spec.disallowed_env_vars" - ErrRegistryInDifferentRegion = "spec.registry_in_different_region" - ErrRegistryAccountIDMismatch = "spec.registry_account_id_mismatch" ErrComputeResourceConflict = "spec.compute_resource_conflict" ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight" @@ -118,10 +116,10 @@ func ErrorDuplicateContainerName(containerName string) error { }) } -func ErrorConflictingFields(fieldKeyA, fieldKeyB string) error { +func ErrorCantSpecifyBoth(fieldKeyA, fieldKeyB string) error { return errors.WithStack(&errors.Error{ - Kind: ErrConflictingFields, - Message: fmt.Sprintf("please specify either the %s or %s field (both cannot be specified at the same time)", fieldKeyA, fieldKeyB), + Kind: ErrCantSpecifyBoth, + Message: fmt.Sprintf("please specify either %s or %s (both cannot be specified at the same time)", fieldKeyA, fieldKeyB), }) } @@ -132,10 +130,10 @@ func ErrorSpecifyOnlyOneField(fields ...string) error { }) } -func ErrorSpecifyOneOrTheOther(fieldKeyA, fieldKeyB string) error { +func ErrorNoneSpecified(fieldKeyA, fieldKeyB string) error { return errors.WithStack(&errors.Error{ - Kind: ErrSpecifyOneOrTheOther, - Message: fmt.Sprintf("please specify either the %s field or %s field (cannot be both empty at the same time)", fieldKeyA, fieldKeyB), + Kind: ErrNoneSpecified, + Message: fmt.Sprintf("please specify either %s or %s (cannot be both empty at the same time)", fieldKeyA, fieldKeyB), }) } @@ -221,14 +219,14 @@ func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error { func ErrorFieldMustBeSpecifiedForKind(field string, kind userconfig.Kind) error { return errors.WithStack(&errors.Error{ Kind: ErrFieldMustBeSpecifiedForKind, - Message: fmt.Sprintf("field %s must be specified for %s kind", field, kind.String()), + Message: fmt.Sprintf("%s must be specified for %s kind", field, kind.String()), }) } func ErrorFieldIsNotSupportedForKind(field string, kind userconfig.Kind) error { return errors.WithStack(&errors.Error{ Kind: ErrFieldIsNotSupportedForKind, - Message: fmt.Sprintf("%s field is not supported for %s kind", field, kind.String()), + Message: fmt.Sprintf("%s is not supported for %s kind", field, kind.String()), }) } @@ -246,20 +244,6 @@ func ErrorDisallowedEnvVars(disallowedValues ...string) error { }) } -func ErrorRegistryInDifferentRegion(registryRegion string, awsClientRegion string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrRegistryInDifferentRegion, - Message: fmt.Sprintf("registry region (%s) does not match cortex's region (%s); images can only be pulled from repositories in the same region as cortex", registryRegion, awsClientRegion), - }) -} - -func ErrorRegistryAccountIDMismatch(regID, opID string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrRegistryAccountIDMismatch, - Message: fmt.Sprintf("registry account ID (%s) doesn't match your AWS account ID (%s), and using an ECR registry in a different AWS account is not supported", regID, opID), - }) -} - func ErrorComputeResourceConflict(resourceA, resourceB string) error { return errors.WithStack(&errors.Error{ Kind: ErrComputeResourceConflict, From 58fbf51e919c39315b153ad0b9490df9b1801de2 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 17:08:18 +0300 Subject: [PATCH 11/15] Address PR comments --- pkg/consts/consts.go | 2 - pkg/operator/lib/autoscaler/autoscaler.go | 4 +- pkg/types/spec/validations.go | 20 ++++-- pkg/types/userconfig/api.go | 11 +-- pkg/types/userconfig/config_key.go | 6 +- pkg/workloads/k8s.go | 81 +++++------------------ test/apis/realtime/main.py | 2 +- 7 files changed, 41 insertions(+), 85 deletions(-) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 57d575a309..50fca29b6c 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -26,7 +26,6 @@ var ( DefaultMaxQueueLength = int64(1024) DefaultMaxConcurrency = int64(16) - DefaultTargetInFlight = float64(16) DefaultUserPodPortStr = "8080" DefaultUserPodPortInt32 = int32(8080) @@ -45,7 +44,6 @@ var ( ReservedContainerNames = []string{ "proxy", - "neuron-rtd", } ) diff --git a/pkg/operator/lib/autoscaler/autoscaler.go b/pkg/operator/lib/autoscaler/autoscaler.go index 79ba7ac385..e3d13cd920 100644 --- a/pkg/operator/lib/autoscaler/autoscaler.go +++ b/pkg/operator/lib/autoscaler/autoscaler.go @@ -133,7 +133,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl return nil } - rawRecommendation := *avgInFlight / autoscalingSpec.TargetInFlight + rawRecommendation := *avgInFlight / *autoscalingSpec.TargetInFlight recommendation := int32(math.Ceil(rawRecommendation)) if rawRecommendation < float64(currentReplicas) && rawRecommendation > float64(currentReplicas)*(1-autoscalingSpec.DownscaleTolerance) { @@ -199,7 +199,7 @@ func AutoscaleFn(initialDeployment *kapps.Deployment, apiSpec *spec.API, getInFl apiLogger.Debugw(fmt.Sprintf("%s autoscaler tick", apiName), "autoscaling", map[string]interface{}{ "avg_in_flight": *avgInFlight, - "target_in_flight": autoscalingSpec.TargetInFlight, + "target_in_flight": *autoscalingSpec.TargetInFlight, "raw_recommendation": rawRecommendation, "current_replicas": currentReplicas, "downscale_tolerance": autoscalingSpec.DownscaleTolerance, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index afb301d055..462012b2b8 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -169,7 +169,9 @@ func podValidation() *cr.StructFieldValidation { { StructField: "Port", Int32PtrValidation: &cr.Int32PtrValidation{ - Required: false, + Required: false, + // it's a pointer because it's not required for the task API + // but it is for the realtime/async/batch APIs Default: nil, AllowExplicitNull: true, DisallowedValues: []int32{ @@ -335,7 +337,7 @@ func autoscalingValidation() *cr.StructFieldValidation { Int64Validation: &cr.Int64Validation{ Default: consts.DefaultMaxQueueLength, GreaterThan: pointer.Int64(0), - // our configured nginx can theoretically accept up to 32768 connections, but during testing, + // the proxy can theoretically accept up to 32768 connections, but during testing, // it has been observed that the number is just slightly lower, so it has been offset by 2678 LessThanOrEqualTo: pointer.Int64(30000), }, @@ -345,15 +347,15 @@ func autoscalingValidation() *cr.StructFieldValidation { Int64Validation: &cr.Int64Validation{ Default: consts.DefaultMaxConcurrency, GreaterThan: pointer.Int64(0), - // our configured nginx can theoretically accept up to 32768 connections, but during testing, + // the proxy can theoretically accept up to 32768 connections, but during testing, // it has been observed that the number is just slightly lower, so it has been offset by 2678 LessThanOrEqualTo: pointer.Int64(30000), }, }, { StructField: "TargetInFlight", - Float64Validation: &cr.Float64Validation{ - Default: consts.DefaultTargetInFlight, + Float64PtrValidation: &cr.Float64PtrValidation{ + Default: nil, GreaterThan: pointer.Float64(0), }, }, @@ -624,8 +626,12 @@ func validateContainers( func validateAutoscaling(api *userconfig.API) error { autoscaling := api.Autoscaling - if autoscaling.TargetInFlight > float64(autoscaling.MaxConcurrency)+float64(autoscaling.MaxQueueLength) { - return ErrorTargetInFlightLimitReached(autoscaling.TargetInFlight, autoscaling.MaxConcurrency, autoscaling.MaxQueueLength) + if autoscaling.TargetInFlight == nil { + autoscaling.TargetInFlight = pointer.Float64(float64(autoscaling.MaxConcurrency)) + } + + if *autoscaling.TargetInFlight > float64(autoscaling.MaxConcurrency)+float64(autoscaling.MaxQueueLength) { + return ErrorTargetInFlightLimitReached(*autoscaling.TargetInFlight, autoscaling.MaxConcurrency, autoscaling.MaxQueueLength) } if autoscaling.MinReplicas > autoscaling.MaxReplicas { diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index a5f4a6c89a..8516aadfc7 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/urls" @@ -83,7 +84,7 @@ type Autoscaling struct { InitReplicas int32 `json:"init_replicas" yaml:"init_replicas"` MaxQueueLength int64 `json:"max_queue_length" yaml:"max_queue_length"` MaxConcurrency int64 `json:"max_concurrency" yaml:"max_concurrency"` - TargetInFlight float64 `json:"target_in_flight" yaml:"target_in_flight"` + TargetInFlight *float64 `json:"target_in_flight" yaml:"target_in_flight"` Window time.Duration `json:"window" yaml:"window"` DownscaleStabilizationPeriod time.Duration `json:"downscale_stabilization_period" yaml:"downscale_stabilization_period"` UpscaleStabilizationPeriod time.Duration `json:"upscale_stabilization_period" yaml:"upscale_stabilization_period"` @@ -132,7 +133,7 @@ func (api *API) ToK8sAnnotations() map[string]string { annotations[MinReplicasAnnotationKey] = s.Int32(api.Autoscaling.MinReplicas) annotations[MaxReplicasAnnotationKey] = s.Int32(api.Autoscaling.MaxReplicas) annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Autoscaling.MaxQueueLength) - annotations[TargetInFlightAnnotationKey] = s.Float64(api.Autoscaling.TargetInFlight) + annotations[TargetInFlightAnnotationKey] = s.Float64(*api.Autoscaling.TargetInFlight) annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Autoscaling.MaxConcurrency) annotations[WindowAnnotationKey] = api.Autoscaling.Window.String() annotations[DownscaleStabilizationPeriodAnnotationKey] = api.Autoscaling.DownscaleStabilizationPeriod.String() @@ -176,7 +177,7 @@ func AutoscalingFromAnnotations(k8sObj kmeta.Object) (*Autoscaling, error) { if err != nil { return nil, err } - a.TargetInFlight = targetInFlight + a.TargetInFlight = pointer.Float64(targetInFlight) window, err := k8s.ParseDurationAnnotation(k8sObj, WindowAnnotationKey) if err != nil { @@ -383,7 +384,7 @@ func (autoscaling *Autoscaling) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", InitReplicasKey, s.Int32(autoscaling.InitReplicas))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxQueueLengthKey, s.Int64(autoscaling.MaxQueueLength))) sb.WriteString(fmt.Sprintf("%s: %s\n", MaxConcurrencyKey, s.Int64(autoscaling.MaxConcurrency))) - sb.WriteString(fmt.Sprintf("%s: %s\n", TargetInFlightKey, s.Float64(autoscaling.TargetInFlight))) + sb.WriteString(fmt.Sprintf("%s: %s\n", TargetInFlightKey, s.Float64(*autoscaling.TargetInFlight))) sb.WriteString(fmt.Sprintf("%s: %s\n", WindowKey, autoscaling.Window.String())) sb.WriteString(fmt.Sprintf("%s: %s\n", DownscaleStabilizationPeriodKey, autoscaling.DownscaleStabilizationPeriod.String())) sb.WriteString(fmt.Sprintf("%s: %s\n", UpscaleStabilizationPeriodKey, autoscaling.UpscaleStabilizationPeriod.String())) @@ -510,7 +511,7 @@ func (api *API) TelemetryEvent() map[string]interface{} { event["autoscaling.init_replicas"] = api.Autoscaling.InitReplicas event["autoscaling.max_queue_length"] = api.Autoscaling.MaxQueueLength event["autoscaling.max_concurrency"] = api.Autoscaling.MaxConcurrency - event["autoscaling.target_in_flight"] = api.Autoscaling.TargetInFlight + event["autoscaling.target_in_flight"] = *api.Autoscaling.TargetInFlight event["autoscaling.window"] = api.Autoscaling.Window.Seconds() event["autoscaling.downscale_stabilization_period"] = api.Autoscaling.DownscaleStabilizationPeriod.Seconds() event["autoscaling.upscale_stabilization_period"] = api.Autoscaling.UpscaleStabilizationPeriod.Seconds() diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 63bbd1bc7f..4a303c9a46 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -76,9 +76,9 @@ const ( EndpointAnnotationKey = "networking.cortex.dev/endpoint" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" - MaxQueueLengthAnnotationKey = "autoscaling.cortex.dev/max-replica-queue-length" - MaxConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-replica-concurrency" - TargetInFlightAnnotationKey = "autoscaling.cortex.dev/target-replica-concurrency" + MaxQueueLengthAnnotationKey = "autoscaling.cortex.dev/max-queue-length" + MaxConcurrencyAnnotationKey = "autoscaling.cortex.dev/max-concurrency" + TargetInFlightAnnotationKey = "autoscaling.cortex.dev/target-in-flight" WindowAnnotationKey = "autoscaling.cortex.dev/window" DownscaleStabilizationPeriodAnnotationKey = "autoscaling.cortex.dev/downscale-stabilization-period" UpscaleStabilizationPeriodAnnotationKey = "autoscaling.cortex.dev/upscale-stabilization-period" diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 39025d1282..918128dca9 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -48,9 +48,6 @@ const ( _gatewayContainerName = "gateway" - _neuronRTDContainerName = "neuron-rtd" - _neuronRTDSocket = "/sock/neuron.sock" - _kubexitGraveyardName = "graveyard" _kubexitGraveyardMountPath = "/graveyard" @@ -146,11 +143,13 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } var containers []kcore.Container - var podHasInf bool containerNames := userconfig.GetContainerNames(api.Pod.Containers) for _, container := range api.Pod.Containers { containerResourceList := kcore.ResourceList{} containerResourceLimitsList := kcore.ResourceList{} + securityContext := kcore.SecurityContext{ + Privileged: pointer.Bool(true), + } if container.Compute.CPU != nil { containerResourceList[kcore.ResourceCPU] = *k8s.QuantityPtr(container.Compute.CPU.Quantity.DeepCopy()) @@ -168,30 +167,18 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { containerVolumeMounts := containerMounts if container.Compute.Inf > 0 { - volumes = append(volumes, kcore.Volume{ - Name: "neuron-sock", - }) - rtdVolumeMounts := []kcore.VolumeMount{ - { - Name: "neuron-sock", - MountPath: "/sock", - }, - } + totalHugePages := container.Compute.Inf * _hugePagesMemPerInf + containerResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) + containerResourceList["hugepages-2Mi"] = *kresource.NewQuantity(totalHugePages, kresource.BinarySI) + containerResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) + containerResourceLimitsList["hugepages-2Mi"] = *kresource.NewQuantity(totalHugePages, kresource.BinarySI) - containerVolumeMounts = append(containerVolumeMounts, rtdVolumeMounts...) - - if requiresKubexit { - rtdVolumeMounts = append(rtdVolumeMounts, - k8s.EmptyDirVolumeMount(_emptyDirVolumeName, _emptyDirMountPath), - kcore.VolumeMount{Name: _kubexitGraveyardName, MountPath: _kubexitGraveyardMountPath}, - ) - neuronRTDEnvVars := getKubexitEnvVars(_neuronRTDContainerName, containerNames.Slice(), nil) - containers = append(containers, neuronRuntimeDaemonContainer(container.Compute.Inf, rtdVolumeMounts, neuronRTDEnvVars)) - } else { - containers = append(containers, neuronRuntimeDaemonContainer(container.Compute.Inf, rtdVolumeMounts, nil)) + securityContext.Capabilities = &kcore.Capabilities{ + Add: []kcore.Capability{ + "SYS_ADMIN", + "IPC_LOCK", + }, } - - podHasInf = true } containerEnvVars := []kcore.EnvVar{ @@ -211,11 +198,7 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { if requiresKubexit { containerDeathDependencies := containerNames.Copy() containerDeathDependencies.Remove(container.Name) - if podHasInf { - containerEnvVars = getKubexitEnvVars(container.Name, containerDeathDependencies.Slice(), []string{"neuron-rtd"}) - } else { - containerEnvVars = getKubexitEnvVars(container.Name, containerDeathDependencies.Slice(), nil) - } + containerEnvVars = getKubexitEnvVars(container.Name, containerDeathDependencies.Slice(), nil) } for k, v := range container.Env { @@ -242,10 +225,8 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { Limits: containerResourceLimitsList, }, ImagePullPolicy: kcore.PullAlways, - SecurityContext: &kcore.SecurityContext{ - Privileged: pointer.Bool(true), - }}, - ) + SecurityContext: &securityContext, + }) } return containers, volumes @@ -349,36 +330,6 @@ func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity { } } -func neuronRuntimeDaemonContainer(computeInf int64, volumeMounts []kcore.VolumeMount, envVars []kcore.EnvVar) kcore.Container { - totalHugePages := computeInf * _hugePagesMemPerInf - return kcore.Container{ - Name: _neuronRTDContainerName, - Image: config.ClusterConfig.ImageNeuronRTD, - ImagePullPolicy: kcore.PullAlways, - Env: envVars, - SecurityContext: &kcore.SecurityContext{ - Capabilities: &kcore.Capabilities{ - Add: []kcore.Capability{ - "SYS_ADMIN", - "IPC_LOCK", - }, - }, - }, - VolumeMounts: volumeMounts, - ReadinessProbe: SocketExistsProbe(_neuronRTDSocket), - Resources: kcore.ResourceRequirements{ - Requests: kcore.ResourceList{ - "hugepages-2Mi": *kresource.NewQuantity(totalHugePages, kresource.BinarySI), - "aws.amazon.com/neuron": *kresource.NewQuantity(computeInf, kresource.DecimalSI), - }, - Limits: kcore.ResourceList{ - "hugepages-2Mi": *kresource.NewQuantity(totalHugePages, kresource.BinarySI), - "aws.amazon.com/neuron": *kresource.NewQuantity(computeInf, kresource.DecimalSI), - }, - }, - } -} - func RealtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { return kcore.Container{ Name: _proxyContainerName, diff --git a/test/apis/realtime/main.py b/test/apis/realtime/main.py index a164b76d2f..f13fb6f436 100644 --- a/test/apis/realtime/main.py +++ b/test/apis/realtime/main.py @@ -16,4 +16,4 @@ def hello_world(): if __name__ == "__main__": - app.run(debug=True, host="0.0.0.0", port=int(os.getenv("CORTEX_PORT", "8000"))) + app.run(debug=True, host="0.0.0.0", port=int(os.getenv("CORTEX_PORT", "8080"))) From 64a36a9aa5a170b2daa1f8c22152ffc6bc57ab4e Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 17:45:09 +0300 Subject: [PATCH 12/15] Address PR comments --- cmd/proxy/main.go | 5 ++++- pkg/types/spec/validations.go | 3 --- pkg/workloads/k8s.go | 8 -------- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index b0950330af..d7e4f0fa05 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -32,6 +32,7 @@ import ( "github.com/cortexlabs/cortex/pkg/proxy" "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "go.uber.org/zap" ) const ( @@ -193,9 +194,11 @@ func main() { log.Infof("Shutting down %s server", name) if err := server.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: - Exit(errors.Wrap(err, "HTTP server Shutdown Error")) + log.Warn("HTTP server Shutdown Error", zap.Error(err)) + telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error")) } } log.Info("Shutdown complete, exiting...") + telemetry.Close() } } diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 462012b2b8..14399876bf 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -614,9 +614,6 @@ func validateContainers( if strings.HasPrefix(key, "CORTEX_") || strings.HasPrefix(key, "KUBEXIT_") { return errors.Wrap(ErrorCortexPrefixedEnvVarNotAllowed("CORTEX_", "KUBEXIT_"), strconv.FormatInt(int64(i), 10), userconfig.EnvKey, key) } - if key == "HOST_IP" { - return errors.Wrap(ErrorDisallowedEnvVars(key), strconv.FormatInt(int64(i), 10), userconfig.EnvKey, key) - } } } diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 918128dca9..7bec83a89c 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -182,14 +182,6 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } containerEnvVars := []kcore.EnvVar{ - { - Name: "HOST_IP", - ValueFrom: &kcore.EnvVarSource{ - FieldRef: &kcore.ObjectFieldSelector{ - FieldPath: "status.hostIP", - }, - }, - }, { Name: "CORTEX_PORT", Value: s.Int32(*api.Pod.Port), From 0e8ada45733c7774052a9d08bd8dc05322bfd85c Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 18:07:59 +0300 Subject: [PATCH 13/15] Fix task path --- test/apis/task/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/apis/task/main.py b/test/apis/task/main.py index dcac680486..494e9bdfb0 100644 --- a/test/apis/task/main.py +++ b/test/apis/task/main.py @@ -2,7 +2,7 @@ def main(): - with open("/mnt/job_spec.json", "r") as f: + with open("/cortex/job_spec.json", "r") as f: job_spec = json.load(f) print(json.dumps(job_spec, indent=2)) From cdd9b06bfd962f813b9081fc980f35d2ba7da784 Mon Sep 17 00:00:00 2001 From: Robert Lucian Chiriac Date: Wed, 19 May 2021 18:27:09 +0300 Subject: [PATCH 14/15] Resolve task api bugs --- pkg/workloads/k8s.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 7bec83a89c..fe87d4cb94 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -181,12 +181,14 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } } - containerEnvVars := []kcore.EnvVar{ - { + containerEnvVars := []kcore.EnvVar{} + if api.Kind != userconfig.TaskAPIKind { + containerEnvVars = append(containerEnvVars, kcore.EnvVar{ Name: "CORTEX_PORT", Value: s.Int32(*api.Pod.Port), - }, + }) } + if requiresKubexit { containerDeathDependencies := containerNames.Copy() containerDeathDependencies.Remove(container.Name) @@ -201,8 +203,8 @@ func UserPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } var containerCmd []string - if requiresKubexit && container.Command[0] != "/mnt/kubexit" { - containerCmd = append([]string{"/mnt/kubexit"}, container.Command...) + if requiresKubexit && container.Command[0] != "/cortex/kubexit" { + containerCmd = append([]string{"/cortex/kubexit"}, container.Command...) } containers = append(containers, kcore.Container{ From 67f00572570a1841437399c7dd769243388af8c5 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 19 May 2021 09:07:51 -0700 Subject: [PATCH 15/15] Update comment --- pkg/types/spec/validations.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 14399876bf..92d8ae00c7 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -169,10 +169,8 @@ func podValidation() *cr.StructFieldValidation { { StructField: "Port", Int32PtrValidation: &cr.Int32PtrValidation{ - Required: false, - // it's a pointer because it's not required for the task API - // but it is for the realtime/async/batch APIs - Default: nil, + Required: false, + Default: nil, // it's a pointer because it's not required for the task API AllowExplicitNull: true, DisallowedValues: []int32{ consts.ProxyListeningPortInt32,