diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index 7314231a84..bf4c5d7405 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -837,6 +837,7 @@ var _clusterHealthCmd = &cobra.Command{ {"prometheus", console.BoolColor(clusterHealth.Prometheus), clusterWarnings.Prometheus}, {"autoscaler", console.BoolColor(clusterHealth.Autoscaler), ""}, {"activator", console.BoolColor(clusterHealth.Activator), ""}, + {"async gateway", console.BoolColor(clusterHealth.AsyncGateway), ""}, {"grafana", console.BoolColor(clusterHealth.Grafana), ""}, {"controller manager", console.BoolColor(clusterHealth.ControllerManager), ""}, {"apis gateway", console.BoolColor(clusterHealth.APIsGateway), ""}, @@ -1057,7 +1058,7 @@ func printInfoNodes(infoResponse *schema.InfoResponse) { numAPIInstances := len(infoResponse.WorkerNodeInfos) var totalReplicas int - var doesClusterHaveGPUs, doesClusterHaveInfs, doesClusterHaveAsyncGateways, doesClusterHaveEnqueuers bool + var doesClusterHaveGPUs, doesClusterHaveInfs, doesClusterHaveEnqueuers bool for _, nodeInfo := range infoResponse.WorkerNodeInfos { totalReplicas += nodeInfo.NumReplicas if nodeInfo.ComputeUserCapacity.GPU > 0 { @@ -1066,9 +1067,6 @@ func printInfoNodes(infoResponse *schema.InfoResponse) { if nodeInfo.ComputeUserCapacity.Inf > 0 { doesClusterHaveInfs = true } - if nodeInfo.NumAsyncGatewayReplicas > 0 { - doesClusterHaveAsyncGateways = true - } if nodeInfo.NumEnqueuerReplicas > 0 { doesClusterHaveEnqueuers = true } @@ -1089,7 +1087,6 @@ func printInfoNodes(infoResponse *schema.InfoResponse) { {Title: "instance type"}, {Title: "lifecycle"}, {Title: "replicas"}, - {Title: "async gateway replicas", Hidden: !doesClusterHaveAsyncGateways}, {Title: "batch enqueuer replicas", Hidden: !doesClusterHaveEnqueuers}, {Title: "CPU (requested / total allocatable)"}, {Title: "memory (requested / total allocatable)"}, @@ -1108,7 +1105,7 @@ func printInfoNodes(infoResponse *schema.InfoResponse) { memStr := nodeInfo.ComputeUserRequested.Mem.String() + " / " + nodeInfo.ComputeUserCapacity.Mem.String() gpuStr := s.Int64(nodeInfo.ComputeUserRequested.GPU) + " / " + s.Int64(nodeInfo.ComputeUserCapacity.GPU) infStr := s.Int64(nodeInfo.ComputeUserRequested.Inf) + " / " + s.Int64(nodeInfo.ComputeUserCapacity.Inf) - rows = append(rows, []interface{}{nodeInfo.InstanceType, lifecycle, nodeInfo.NumReplicas, nodeInfo.NumAsyncGatewayReplicas, nodeInfo.NumEnqueuerReplicas, cpuStr, memStr, gpuStr, infStr}) + rows = append(rows, []interface{}{nodeInfo.InstanceType, lifecycle, nodeInfo.NumReplicas, nodeInfo.NumEnqueuerReplicas, cpuStr, memStr, gpuStr, infStr}) } t := table.Table{ diff --git a/cmd/async-gateway/main.go b/cmd/async-gateway/main.go index 5d6a5e3e00..54298dd67f 100644 --- a/cmd/async-gateway/main.go +++ b/cmd/async-gateway/main.go @@ -20,13 +20,13 @@ import ( "flag" "net/http" "os" + "strings" gateway "github.com/cortexlabs/cortex/pkg/async-gateway" "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/logging" "github.com/cortexlabs/cortex/pkg/lib/telemetry" - "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/userconfig" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -37,7 +37,7 @@ const ( _defaultPort = "8080" ) -// usage: ./gateway -bucket -region -port -queue queue +// usage: ./gateway -bucket -region -port func main() { log := logging.GetLogger() defer func() { @@ -45,30 +45,20 @@ func main() { }() var ( - clusterConfigPath = flag.String("cluster-config", "", "cluster config path") - port = flag.String("port", _defaultPort, "port on which the gateway server runs on") - queueURL = flag.String("queue", "", "SQS queue URL") + bucket = flag.String("bucket", "", "bucket") + clusterUID = flag.String("cluster-uid", "", "cluster uid") + port = flag.String("port", _defaultPort, "port on which the gateway server runs on") ) flag.Parse() switch { - case *queueURL == "": - log.Fatal("missing required option: -queue") - case *clusterConfigPath == "": - log.Fatal("missing required option: -cluster-config") + case *bucket == "": + log.Fatal("missing required option: -bucket") + case *clusterUID == "": + log.Fatal("missing required option: -cluster-uid") } - apiName := flag.Arg(0) - if apiName == "" { - log.Fatal("apiName argument was not provided") - } - - clusterConfig, err := clusterconfig.NewForFile(*clusterConfigPath) - if err != nil { - exit(log, err) - } - - awsClient, err := aws.NewForRegion(clusterConfig.Region) + awsClient, err := aws.New() if err != nil { exit(log, err) } @@ -78,8 +68,9 @@ func main() { exit(log, err) } + telemetryEnabled := strings.ToLower(os.Getenv("CORTEX_TELEMETRY_DISABLE")) != "true" err = telemetry.Init(telemetry.Config{ - Enabled: clusterConfig.Telemetry, + Enabled: telemetryEnabled, UserID: userID, Properties: map[string]string{ "kind": userconfig.AsyncAPIKind.String(), @@ -95,10 +86,9 @@ func main() { defer telemetry.Close() sess := awsClient.Session() - s3Storage := gateway.NewS3(sess, clusterConfig.Bucket) - sqsQueue := gateway.NewSQS(*queueURL, sess) + s3Storage := gateway.NewS3(sess, *bucket) - svc := gateway.NewService(clusterConfig.ClusterUID, apiName, sqsQueue, s3Storage, log) + svc := gateway.NewService(*clusterUID, s3Storage, log, *sess) ep := gateway.NewEndpoint(svc, log) router := mux.NewRouter() diff --git a/cmd/operator/main.go b/cmd/operator/main.go index ac38ee7130..aa38dd5200 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -67,10 +67,8 @@ func main() { apiKind := deployment.Labels["apiKind"] switch apiKind { case userconfig.AsyncAPIKind.String(): - if deployment.Labels["cortex.dev/async"] == "api" { - if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil { - operatorLogger.Fatal(errors.Wrap(err, "init")) - } + if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil { + operatorLogger.Fatal(errors.Wrap(err, "init")) } } } diff --git a/manager/generate_eks.py b/manager/generate_eks.py index 6d4ccbad76..7c354e63d7 100644 --- a/manager/generate_eks.py +++ b/manager/generate_eks.py @@ -319,9 +319,9 @@ def generate_eks( "ami": get_ami(ami_map, "t3.medium"), "name": "cx-operator", "instanceType": "t3.medium", - "minSize": 1, + "minSize": 2, "maxSize": 25, - "desiredCapacity": 1, + "desiredCapacity": 2, "volumeType": "gp3", "volumeSize": 20, "volumeIOPS": 3000, diff --git a/manager/install.sh b/manager/install.sh index 99a89cde54..93557474fc 100755 --- a/manager/install.sh +++ b/manager/install.sh @@ -52,6 +52,10 @@ function cluster_up() { python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/cluster-autoscaler.yaml.j2 | kubectl apply -f - >/dev/null echo "✓" + echo -n "○ configuring async gateway " + python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/async-gateway.yaml.j2 | kubectl apply -f - >/dev/null + echo "✓" + echo -n "○ configuring logging " python render_template.py $CORTEX_CLUSTER_CONFIG_FILE manifests/fluent-bit.yaml.j2 | kubectl apply -f - >/dev/null envsubst < manifests/event-exporter.yaml | kubectl apply -f - >/dev/null diff --git a/manager/manifests/async-gateway.yaml.j2 b/manager/manifests/async-gateway.yaml.j2 new file mode 100644 index 0000000000..34021fd597 --- /dev/null +++ b/manager/manifests/async-gateway.yaml.j2 @@ -0,0 +1,109 @@ +# Copyright 2021 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: async-gateway + namespace: default +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: async-gateway + namespace: default +spec: + selector: + matchLabels: + app: async-gateway + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + name: async-gateway + labels: + app: async-gateway + spec: + serviceAccountName: async-gateway + containers: + - name: gateway + image: {{ config["image_async_gateway"] }} + imagePullPolicy: Always + args: + - --port + - "8888" + - --cluster-uid + - "{{ config["cluster_uid"] }}" + - --bucket + - "{{ config["bucket"] }}" + envFrom: + - configMapRef: + name: env-vars + ports: + - containerPort: 8888 + readinessProbe: + httpGet: + path: /healthz + port: 8888 + scheme: HTTP + livenessProbe: + httpGet: + path: /healthz + port: 8888 + scheme: HTTP + resources: + requests: + cpu: 400m + memory: 512Mi + limits: + cpu: 400m +--- +apiVersion: v1 +kind: Service +metadata: + name: async-gateway +spec: + type: ClusterIP + selector: + app: async-gateway + ports: + - port: 8888 +--- +apiVersion: autoscaling/v2beta2 +kind: HorizontalPodAutoscaler +metadata: + name: async-gateway +spec: + maxReplicas: 20 + minReplicas: 1 + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: async-gateway + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 90 + - type: Resource + resource: + name: memory + target: + type: Utilization + averageUtilization: 90 diff --git a/manager/manifests/cluster-autoscaler.yaml.j2 b/manager/manifests/cluster-autoscaler.yaml.j2 index 8993bf6df3..1e2ef1994a 100644 --- a/manager/manifests/cluster-autoscaler.yaml.j2 +++ b/manager/manifests/cluster-autoscaler.yaml.j2 @@ -185,7 +185,7 @@ spec: cpu: 300m requests: cpu: 100m - memory: 200Mi + memory: 400Mi command: - ./cluster-autoscaler - --v=4 diff --git a/manager/manifests/istio.yaml.j2 b/manager/manifests/istio.yaml.j2 index 55b186cabf..0341b2d112 100644 --- a/manager/manifests/istio.yaml.j2 +++ b/manager/manifests/istio.yaml.j2 @@ -137,7 +137,7 @@ spec: resources: requests: cpu: 400m - memory: 128Mi + memory: 512Mi limits: cpu: 1500m memory: 1024Mi diff --git a/manager/manifests/prometheus-monitoring.yaml b/manager/manifests/prometheus-monitoring.yaml index c9d99366ff..5a7d9dc4e6 100644 --- a/manager/manifests/prometheus-monitoring.yaml +++ b/manager/manifests/prometheus-monitoring.yaml @@ -252,7 +252,6 @@ spec: selector: matchLabels: apiKind: AsyncAPI - cortex.dev/async: api matchExpressions: - { key: prometheus-ignore, operator: DoesNotExist } namespaceSelector: diff --git a/pkg/async-gateway/endpoint.go b/pkg/async-gateway/endpoint.go index 76c1ee22ec..ac2673fb7d 100644 --- a/pkg/async-gateway/endpoint.go +++ b/pkg/async-gateway/endpoint.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/types/async" @@ -50,14 +51,28 @@ func (e *Endpoint) CreateWorkload(w http.ResponseWriter, r *http.Request) { return } + apiName := r.Header.Get(consts.CortexAPINameHeader) + if requestID == "" { + respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexAPINameHeader)) + return + } + r.Header.Del(consts.CortexAPINameHeader) + + queueURL := r.Header.Get(consts.CortexQueueURLHeader) + if queueURL == "" { + respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexQueueURLHeader)) + return + } + r.Header.Del(consts.CortexQueueURLHeader) + body := r.Body defer func() { _ = r.Body.Close() }() - log := e.logger.With(zap.String("id", requestID)) + log := e.logger.With(zap.String("id", requestID), zap.String("apiName", apiName)) - id, err := e.service.CreateWorkload(requestID, body, r.Header) + id, err := e.service.CreateWorkload(requestID, apiName, queueURL, body, r.Header) if err != nil { respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err)) logErrorWithTelemetry(log, errors.Wrap(err, "failed to create workload")) @@ -79,9 +94,16 @@ func (e *Endpoint) GetWorkload(w http.ResponseWriter, r *http.Request) { return } - log := e.logger.With(zap.String("id", id)) + apiName := r.Header.Get(consts.CortexAPINameHeader) + if apiName == "" { + respondPlainText(w, http.StatusBadRequest, fmt.Sprintf("error: missing %s key in request header", consts.CortexAPINameHeader)) + return + } + r.Header.Del(consts.CortexAPINameHeader) + + log := e.logger.With(zap.String("id", id), zap.String("apiName", apiName)) - res, err := e.service.GetWorkload(id) + res, err := e.service.GetWorkload(id, apiName) if err != nil { respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err)) logErrorWithTelemetry(log, errors.Wrap(err, "failed to get workload")) diff --git a/pkg/async-gateway/service.go b/pkg/async-gateway/service.go index 344d252ad6..d8f5f4f7f2 100644 --- a/pkg/async-gateway/service.go +++ b/pkg/async-gateway/service.go @@ -24,6 +24,7 @@ import ( "net/http" "strings" + "github.com/aws/aws-sdk-go/aws/session" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/types/async" "go.uber.org/zap" @@ -31,33 +32,31 @@ import ( // Service provides an interface to the async-gateway business logic type Service interface { - CreateWorkload(id string, payload io.Reader, headers http.Header) (string, error) - GetWorkload(id string) (GetWorkloadResponse, error) + CreateWorkload(id string, apiName string, queueURL string, payload io.Reader, headers http.Header) (string, error) + GetWorkload(id string, apiName string) (GetWorkloadResponse, error) } type service struct { logger *zap.SugaredLogger - queue Queue storage Storage clusterUID string - apiName string + session session.Session } // NewService creates a new async-gateway service -func NewService(clusterUID, apiName string, queue Queue, storage Storage, logger *zap.SugaredLogger) Service { +func NewService(clusterUID string, storage Storage, logger *zap.SugaredLogger, session session.Session) Service { return &service{ logger: logger, - queue: queue, storage: storage, clusterUID: clusterUID, - apiName: apiName, + session: session, } } // CreateWorkload enqueues an async workload request and uploads the request payload to S3 -func (s *service) CreateWorkload(id string, payload io.Reader, headers http.Header) (string, error) { - prefix := async.StoragePath(s.clusterUID, s.apiName) - log := s.logger.With(zap.String("id", id)) +func (s *service) CreateWorkload(id string, apiName string, queueURL string, payload io.Reader, headers http.Header) (string, error) { + prefix := async.StoragePath(s.clusterUID, apiName) + log := s.logger.With(zap.String("id", id), zap.String("apiName", apiName)) buf := &bytes.Buffer{} if err := json.NewEncoder(buf).Encode(headers); err != nil { @@ -78,7 +77,8 @@ func (s *service) CreateWorkload(id string, payload io.Reader, headers http.Head } log.Debug("sending message to queue") - if err := s.queue.SendMessage(id, id); err != nil { + queue := NewSQS(queueURL, &s.session) + if err := queue.SendMessage(id, id); err != nil { return "", errors.Wrap(err, "failed to send message to queue") } @@ -92,10 +92,10 @@ func (s *service) CreateWorkload(id string, payload io.Reader, headers http.Head } // GetWorkload retrieves the status and result, if available, of a given workload -func (s *service) GetWorkload(id string) (GetWorkloadResponse, error) { - log := s.logger.With(zap.String("id", id)) +func (s *service) GetWorkload(id string, apiName string) (GetWorkloadResponse, error) { + log := s.logger.With(zap.String("id", id), zap.String("apiName", apiName)) - st, err := s.getStatus(id) + st, err := s.getStatus(id, apiName) if err != nil { return GetWorkloadResponse{}, err } @@ -108,7 +108,7 @@ func (s *service) GetWorkload(id string) (GetWorkloadResponse, error) { } // attempt to download user result - prefix := async.StoragePath(s.clusterUID, s.apiName) + prefix := async.StoragePath(s.clusterUID, apiName) resultPath := async.ResultPath(prefix, id) log.Debug("downloading user result", zap.String("path", resultPath)) resultBuf, err := s.storage.Download(resultPath) @@ -135,8 +135,8 @@ func (s *service) GetWorkload(id string) (GetWorkloadResponse, error) { }, nil } -func (s *service) getStatus(id string) (async.Status, error) { - prefix := async.StoragePath(s.clusterUID, s.apiName) +func (s *service) getStatus(id string, apiName string) (async.Status, error) { + prefix := async.StoragePath(s.clusterUID, apiName) log := s.logger.With(zap.String("id", id)) // download workload status diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 3fe860d776..29e6e44205 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -75,6 +75,7 @@ var ( CortexTargetServiceHeader = "X-Cortex-Target-Service" CortexProbeHeader = "X-Cortex-Probe" CortexOriginHeader = "X-Cortex-Origin" + CortexQueueURLHeader = "X-Cortex-Queue-URL" WaitForReadyReplicasTimeout = 20 * time.Minute ) diff --git a/pkg/health/health.go b/pkg/health/health.go index 77e26fa55f..5bc2bae931 100644 --- a/pkg/health/health.go +++ b/pkg/health/health.go @@ -45,6 +45,7 @@ type ClusterHealth struct { Prometheus bool `json:"prometheus"` Autoscaler bool `json:"autoscaler"` Activator bool `json:"activator"` + AsyncGateway bool `json:"async_gateway"` Grafana bool `json:"grafana"` OperatorGateway bool `json:"operator_gateway"` APIsGateway bool `json:"apis_gateway"` @@ -91,6 +92,7 @@ func Check(awsClient *awslib.Client, k8sClient *k8s.Client, clusterName string) prometheusHealth bool autoscalerHealth bool activatorHealth bool + asyncGatewayHealth bool grafanaHealth bool operatorGatewayHealth bool apisGatewayHealth bool @@ -131,6 +133,11 @@ func Check(awsClient *awslib.Client, k8sClient *k8s.Client, clusterName string) activatorHealth, err = getDeploymentReadiness(k8sClient, "activator", consts.DefaultNamespace) return err }, + func() error { + var err error + asyncGatewayHealth, err = getDeploymentReadiness(k8sClient, "async-gateway", consts.DefaultNamespace) + return err + }, func() error { var err error grafanaHealth, err = getStatefulSetReadiness(k8sClient, "grafana", consts.DefaultNamespace) @@ -201,6 +208,7 @@ func Check(awsClient *awslib.Client, k8sClient *k8s.Client, clusterName string) Prometheus: prometheusHealth, Autoscaler: autoscalerHealth, Activator: activatorHealth, + AsyncGateway: asyncGatewayHealth, Grafana: grafanaHealth, OperatorGateway: operatorGatewayHealth, APIsGateway: apisGatewayHealth, diff --git a/pkg/operator/endpoints/info.go b/pkg/operator/endpoints/info.go index c210e74fe7..53ee80a3d7 100644 --- a/pkg/operator/endpoints/info.go +++ b/pkg/operator/endpoints/info.go @@ -119,7 +119,6 @@ func getWorkerNodeInfos() ([]schema.WorkerNodeInfo, int, error) { } _, isAPIPod := pod.Labels["apiName"] - asyncPodType, isAsyncPod := pod.Labels["cortex.dev/async"] batchPodType, isBatchPod := pod.Labels["cortex.dev/batch"] if pod.Spec.NodeName == "" && isAPIPod { @@ -133,9 +132,7 @@ func getWorkerNodeInfos() ([]schema.WorkerNodeInfo, int, error) { } if isAPIPod { - if isAsyncPod && asyncPodType == "gateway" { - node.NumAsyncGatewayReplicas++ - } else if isBatchPod && batchPodType == "enqueuer" { + if isBatchPod && batchPodType == "enqueuer" { node.NumEnqueuerReplicas++ } else { node.NumReplicas++ diff --git a/pkg/operator/endpoints/logs.go b/pkg/operator/endpoints/logs.go index d56add3806..68f5d938d7 100644 --- a/pkg/operator/endpoints/logs.go +++ b/pkg/operator/endpoints/logs.go @@ -66,9 +66,6 @@ func ReadLogs(w http.ResponseWriter, r *http.Request) { labels := map[string]string{"apiName": apiName, "deploymentID": deploymentID, "podID": podID} - if deployedResource.Kind == userconfig.AsyncAPIKind { - labels["cortex.dev/async"] = "api" - } operator.StreamLogsFromRandomPod(labels, socket) } diff --git a/pkg/operator/resources/asyncapi/api.go b/pkg/operator/resources/asyncapi/api.go index 0c6ba6b190..508a7b5121 100644 --- a/pkg/operator/resources/asyncapi/api.go +++ b/pkg/operator/resources/asyncapi/api.go @@ -37,7 +37,6 @@ import ( "github.com/cortexlabs/cortex/pkg/workloads" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" kapps "k8s.io/api/apps/v1" - kautoscaling "k8s.io/api/autoscaling/v2beta2" kcore "k8s.io/api/core/v1" ) @@ -51,16 +50,9 @@ var ( ) type resources struct { - apiDeployment *kapps.Deployment - apiConfigMap *kcore.ConfigMap - gatewayDeployment *kapps.Deployment - gatewayService *kcore.Service - gatewayHPA *kautoscaling.HorizontalPodAutoscaler - gatewayVirtualService *istioclientnetworking.VirtualService -} - -func getGatewayK8sName(apiName string) string { - return "gateway-" + apiName + apiDeployment *kapps.Deployment + apiConfigMap *kcore.ConfigMap + apiVirtualService *istioclientnetworking.VirtualService } func generateDeploymentID() string { @@ -75,19 +67,19 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error) initialDeploymentTime := time.Now().UnixNano() deploymentID := generateDeploymentID() - if prevK8sResources.gatewayVirtualService != nil && prevK8sResources.gatewayVirtualService.Labels["initialDeploymentTime"] != "" { + if prevK8sResources.apiVirtualService != nil && prevK8sResources.apiVirtualService.Labels["initialDeploymentTime"] != "" { var err error - initialDeploymentTime, err = k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime") + initialDeploymentTime, err = k8s.ParseInt64Label(prevK8sResources.apiVirtualService, "initialDeploymentTime") if err != nil { return nil, "", err } - deploymentID = prevK8sResources.gatewayVirtualService.Labels["deploymentID"] + deploymentID = prevK8sResources.apiVirtualService.Labels["deploymentID"] } api := spec.GetAPISpec(&apiConfig, initialDeploymentTime, deploymentID, config.ClusterConfig.ClusterUID) // resource creation - if prevK8sResources.gatewayVirtualService == nil { + if prevK8sResources.apiVirtualService == nil { if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil { return nil, "", errors.Wrap(err, "upload api spec") } @@ -119,7 +111,7 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error) } // resource update - if prevK8sResources.gatewayVirtualService.Labels["specID"] != api.SpecID { + if prevK8sResources.apiVirtualService.Labels["specID"] != api.SpecID { isUpdating, err := isAPIUpdating(prevK8sResources.apiDeployment) if err != nil { return nil, "", err @@ -132,7 +124,7 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error) return nil, "", errors.Wrap(err, "upload api spec") } - initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime") + initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.apiVirtualService, "initialDeploymentTime") if err != nil { return nil, "", err } @@ -164,7 +156,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { prevK8sResources, err := getK8sResources(apiName) if err != nil { return "", err - } else if prevK8sResources.gatewayVirtualService == nil || prevK8sResources.apiDeployment == nil { + } else if prevK8sResources.apiVirtualService == nil || prevK8sResources.apiDeployment == nil { return "", errors.ErrorUnexpected("unable to find deployment", apiName) } @@ -177,7 +169,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { return "", ErrorAPIUpdating(apiName) } - apiID, err := k8s.GetLabel(prevK8sResources.gatewayVirtualService, "apiID") + apiID, err := k8s.GetLabel(prevK8sResources.apiVirtualService, "apiID") if err != nil { return "", err } @@ -187,7 +179,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { return "", err } - initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime") + initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.apiVirtualService, "initialDeploymentTime") if err != nil { return "", err } @@ -257,9 +249,6 @@ func GetAllAPIs(deployments []kapps.Deployment) ([]schema.APIResponse, error) { apiNames := make([]string, 0) for i := range deployments { - if deployments[i].Labels["cortex.dev/async"] != "api" { - continue - } apiName := deployments[i].Labels["apiName"] apiNames = append(apiNames, apiName) @@ -282,21 +271,7 @@ func GetAllAPIs(deployments []kapps.Deployment) ([]schema.APIResponse, error) { } func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - var apiDeployment *kapps.Deployment - var gatewayDeployment *kapps.Deployment - - err := parallel.RunFirstErr( - func() error { - var err error - apiDeployment, err = config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) - return err - }, - func() error { - var err error - gatewayDeployment, err = config.K8s.GetDeployment(getGatewayK8sName(deployedResource.Name)) - return err - }, - ) + apiDeployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) if err != nil { return nil, err } @@ -305,10 +280,6 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp return nil, errors.ErrorUnexpected("unable to find api deployment", deployedResource.Name) } - if gatewayDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find gateway deployment", deployedResource.Name) - } - apiStatus := status.FromDeployment(apiDeployment) apiMetadata, err := spec.MetadataFromDeployment(apiDeployment) if err != nil { @@ -340,20 +311,8 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { var apiDeployment *kapps.Deployment - var gatewayDeployment *kapps.Deployment - err := parallel.RunFirstErr( - func() error { - var err error - apiDeployment, err = config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) - return err - }, - func() error { - var err error - gatewayDeployment, err = config.K8s.GetDeployment(getGatewayK8sName(deployedResource.Name)) - return err - }, - ) + apiDeployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) if err != nil { return nil, err } @@ -362,10 +321,6 @@ func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.AP return nil, errors.ErrorUnexpected("unable to find api deployment", deployedResource.Name) } - if gatewayDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find gateway deployment", deployedResource.Name) - } - apiStatus := status.FromDeployment(apiDeployment) apiMetadata, err := spec.MetadataFromDeployment(apiDeployment) if err != nil { @@ -373,8 +328,7 @@ func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.AP } apiPods, err := config.K8s.ListPodsByLabels(map[string]string{ - "apiName": apiDeployment.Labels["apiName"], - "cortex.dev/async": "api", + "apiName": apiDeployment.Labels["apiName"], }) if err != nil { return nil, err @@ -425,12 +379,8 @@ func UpdateAPIMetricsCron(apiDeployment *kapps.Deployment) error { func getK8sResources(apiName string) (resources, error) { var deployment *kapps.Deployment var apiConfigMap *kcore.ConfigMap - var gatewayDeployment *kapps.Deployment - var gatewayService *kcore.Service - var gatewayHPA *kautoscaling.HorizontalPodAutoscaler - var gatewayVirtualService *istioclientnetworking.VirtualService + var apiVirtualService *istioclientnetworking.VirtualService - gatewayK8sName := getGatewayK8sName(apiName) apiK8sName := workloads.K8sName(apiName) err := parallel.RunFirstErr( @@ -446,33 +396,15 @@ func getK8sResources(apiName string) (resources, error) { }, func() error { var err error - gatewayDeployment, err = config.K8s.GetDeployment(gatewayK8sName) - return err - }, - func() error { - var err error - gatewayService, err = config.K8s.GetService(apiK8sName) - return err - }, - func() error { - var err error - gatewayHPA, err = config.K8s.GetHPA(gatewayK8sName) - return err - }, - func() error { - var err error - gatewayVirtualService, err = config.K8s.GetVirtualService(apiK8sName) + apiVirtualService, err = config.K8s.GetVirtualService(apiK8sName) return err }, ) return resources{ - apiDeployment: deployment, - apiConfigMap: apiConfigMap, - gatewayDeployment: gatewayDeployment, - gatewayService: gatewayService, - gatewayHPA: gatewayHPA, - gatewayVirtualService: gatewayVirtualService, + apiDeployment: deployment, + apiConfigMap: apiConfigMap, + apiVirtualService: apiVirtualService, }, err } @@ -482,13 +414,8 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string if err != nil { return err } - gatewayDeployment := gatewayDeploymentSpec(api, queueURL) - gatewayHPA, err := gatewayHPASpec(api) - if err != nil { - return err - } - gatewayService := gatewayServiceSpec(api) - gatewayVirtualService := gatewayVirtualServiceSpec(api) + + apiVirtualService := apiVirtualServiceSpec(api, queueURL) return parallel.RunFirstErr( func() error { @@ -507,16 +434,7 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string return nil }, func() error { - return applyK8sDeployment(prevK8sResources.gatewayDeployment, &gatewayDeployment) - }, - func() error { - return applyK8sHPA(prevK8sResources.gatewayHPA, &gatewayHPA) - }, - func() error { - return applyK8sService(prevK8sResources.gatewayService, &gatewayService) - }, - func() error { - return applyK8sVirtualService(prevK8sResources.gatewayVirtualService, &gatewayVirtualService) + return applyK8sVirtualService(prevK8sResources.apiVirtualService, &apiVirtualService) }, ) } @@ -558,29 +476,6 @@ func applyK8sDeployment(prevDeployment *kapps.Deployment, newDeployment *kapps.D return nil } -func applyK8sHPA(prevHPA *kautoscaling.HorizontalPodAutoscaler, newHPA *kautoscaling.HorizontalPodAutoscaler) error { - var err error - if prevHPA == nil { - _, err = config.K8s.CreateHPA(newHPA) - } else { - _, err = config.K8s.UpdateHPA(newHPA) - } - if err != nil { - return err - } - return nil -} - -func applyK8sService(prevService *kcore.Service, newService *kcore.Service) error { - if prevService == nil { - _, err := config.K8s.CreateService(newService) - return err - } - - _, err := config.K8s.UpdateService(prevService, newService) - return err -} - func applyK8sVirtualService(prevVirtualService *istioclientnetworking.VirtualService, newVirtualService *istioclientnetworking.VirtualService) error { if prevVirtualService == nil { _, err := config.K8s.CreateVirtualService(newVirtualService) @@ -598,7 +493,6 @@ func deleteBucketResources(apiName string) error { func deleteK8sResources(apiName string) error { apiK8sName := workloads.K8sName(apiName) - gatewayK8sName := getGatewayK8sName(apiName) err := parallel.RunFirstErr( func() error { @@ -614,18 +508,6 @@ func deleteK8sResources(apiName string) error { _, err := config.K8s.DeleteConfigMap(apiK8sName) return err }, - func() error { - _, err := config.K8s.DeleteDeployment(gatewayK8sName) - return err - }, - func() error { - _, err := config.K8s.DeleteHPA(gatewayK8sName) - return err - }, - func() error { - _, err := config.K8s.DeleteService(apiK8sName) - return err - }, func() error { _, err := config.K8s.DeleteVirtualService(apiK8sName) return err @@ -657,7 +539,6 @@ func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { } func isPodSpecLatest(deployment *kapps.Deployment, pod *kcore.Pod) bool { - // Note: the gateway deployment/pods don't have "podID" or "deploymentID" labels, which is ok since it is always up-to-date return deployment.Spec.Template.Labels["podID"] == pod.Labels["podID"] && deployment.Spec.Template.Labels["deploymentID"] == pod.Labels["deploymentID"] } diff --git a/pkg/operator/resources/asyncapi/k8s_specs.go b/pkg/operator/resources/asyncapi/k8s_specs.go index dac6c0971f..91586e1ed3 100644 --- a/pkg/operator/resources/asyncapi/k8s_specs.go +++ b/pkg/operator/resources/asyncapi/k8s_specs.go @@ -23,130 +23,33 @@ import ( s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/workloads" + istionetworking "istio.io/api/networking/v1beta1" "istio.io/client-go/pkg/apis/networking/v1beta1" kapps "k8s.io/api/apps/v1" - kautoscaling "k8s.io/api/autoscaling/v2beta2" kcore "k8s.io/api/core/v1" ) -var _terminationGracePeriodSeconds int64 = 60 // seconds -var _gatewayHPATargetCPUUtilization int32 = 80 // percentage -var _gatewayHPATargetMemUtilization int32 = 80 // percentage +var _terminationGracePeriodSeconds int64 = 60 // seconds -func gatewayDeploymentSpec(api spec.API, queueURL string) kapps.Deployment { - volumeMounts := []kcore.VolumeMount{ - { - Name: "cluster-config", - MountPath: "/configs/cluster", - }, - } - volumes := []kcore.Volume{ - { - Name: "cluster-config", - VolumeSource: kcore.VolumeSource{ - ConfigMap: &kcore.ConfigMapVolumeSource{ - LocalObjectReference: kcore.LocalObjectReference{ - Name: "cluster-config", +func apiVirtualServiceSpec(api spec.API, queueURL string) v1beta1.VirtualService { + return *k8s.VirtualService(&k8s.VirtualServiceSpec{ + Name: workloads.K8sName(api.Name), + Gateways: []string{"apis-gateway"}, + Destinations: []k8s.Destination{ + { + ServiceName: "async-gateway", + Weight: 100, + Port: uint32(consts.ProxyPortInt32), + Headers: &istionetworking.Headers{ + Request: &istionetworking.Headers_HeaderOperations{ + Set: map[string]string{ + consts.CortexAPINameHeader: api.Name, + consts.CortexQueueURLHeader: queueURL, + }, }, }, }, }, - } - container := workloads.AsyncGatewayContainer(api, queueURL, volumeMounts) - - return *k8s.Deployment(&k8s.DeploymentSpec{ - Name: getGatewayK8sName(api.Name), - Replicas: 1, - MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge), - MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable), - Selector: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/async": "gateway", - }, - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/api": "true", - "cortex.dev/async": "gateway", - }, - PodSpec: k8s.PodSpec{ - Labels: map[string]string{ - // ID labels are omitted to avoid restarting the gateway on update/refresh - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/api": "true", - "cortex.dev/async": "gateway", - }, - K8sPodSpec: kcore.PodSpec{ - RestartPolicy: "Always", - TerminationGracePeriodSeconds: pointer.Int64(_terminationGracePeriodSeconds), - Containers: []kcore.Container{container}, - NodeSelector: workloads.NodeSelectors(), - Tolerations: workloads.GenerateResourceTolerations(), - Affinity: workloads.GenerateNodeAffinities(api.NodeGroups), - Volumes: volumes, - ServiceAccountName: workloads.ServiceAccountName, - }, - }, - }) -} - -func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error) { - var maxReplicas int32 = 1 - if api.Autoscaling != nil { - maxReplicas = api.Autoscaling.MaxReplicas - } - hpa, err := k8s.HPA(&k8s.HPASpec{ - DeploymentName: getGatewayK8sName(api.Name), - MinReplicas: 1, - MaxReplicas: maxReplicas, - TargetCPUUtilization: _gatewayHPATargetCPUUtilization, - TargetMemUtilization: _gatewayHPATargetMemUtilization, - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/api": "true", - "cortex.dev/async": "hpa", - }, - }) - - if err != nil { - return kautoscaling.HorizontalPodAutoscaler{}, err - } - return *hpa, nil -} - -func gatewayServiceSpec(api spec.API) kcore.Service { - return *k8s.Service(&k8s.ServiceSpec{ - Name: workloads.K8sName(api.Name), - PortName: "http", - Port: consts.ProxyPortInt32, - TargetPort: consts.ProxyPortInt32, - Annotations: api.ToK8sAnnotations(), - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/api": "true", - "cortex.dev/async": "gateway", - }, - Selector: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/async": "gateway", - }, - }) -} - -func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService { - return *k8s.VirtualService(&k8s.VirtualServiceSpec{ - Name: workloads.K8sName(api.Name), - Gateways: []string{"apis-gateway"}, - Destinations: []k8s.Destination{{ - ServiceName: workloads.K8sName(api.Name), - Weight: 100, - Port: uint32(consts.ProxyPortInt32), - }}, PrefixPath: api.Networking.Endpoint, Rewrite: pointer.String("/"), Annotations: api.ToK8sAnnotations(), @@ -159,7 +62,6 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService { "deploymentID": api.DeploymentID, "podID": api.PodID, "cortex.dev/api": "true", - "cortex.dev/async": "gateway", }, }) } @@ -207,13 +109,11 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str "deploymentID": api.DeploymentID, "podID": api.PodID, "cortex.dev/api": "true", - "cortex.dev/async": "api", }, Annotations: api.ToK8sAnnotations(), Selector: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/async": "api", + "apiName": api.Name, + "apiKind": api.Kind.String(), }, PodSpec: k8s.PodSpec{ Labels: map[string]string{ @@ -224,7 +124,6 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str "deploymentID": api.DeploymentID, "podID": api.PodID, "cortex.dev/api": "true", - "cortex.dev/async": "api", }, K8sPodSpec: kcore.PodSpec{ RestartPolicy: "Always", diff --git a/pkg/operator/schema/schema.go b/pkg/operator/schema/schema.go index 1ee895cace..93acfd7a6d 100644 --- a/pkg/operator/schema/schema.go +++ b/pkg/operator/schema/schema.go @@ -33,13 +33,12 @@ type InfoResponse struct { type WorkerNodeInfo struct { NodeInfo - Name string `json:"name" yaml:"name"` - NumReplicas int `json:"num_replicas" yaml:"num_replicas"` - NumAsyncGatewayReplicas int `json:"num_async_gateway_replicas" yaml:"num_async_gateway_replicas"` - NumEnqueuerReplicas int `json:"num_enqueuer_replicas" yaml:"num_enqueuer_replicas"` - ComputeUserCapacity userconfig.Compute `json:"compute_user_capacity" yaml:"compute_user_capacity"` // the total resources available to the user on a node - ComputeAvailable userconfig.Compute `json:"compute_available" yaml:"compute_unavailable"` // unused resources on a node - ComputeUserRequested userconfig.Compute `json:"compute_user_requested" yaml:"compute_user_requested"` // total resources requested by user on a node + Name string `json:"name" yaml:"name"` + NumReplicas int `json:"num_replicas" yaml:"num_replicas"` + NumEnqueuerReplicas int `json:"num_enqueuer_replicas" yaml:"num_enqueuer_replicas"` + ComputeUserCapacity userconfig.Compute `json:"compute_user_capacity" yaml:"compute_user_capacity"` // the total resources available to the user on a node + ComputeAvailable userconfig.Compute `json:"compute_available" yaml:"compute_unavailable"` // unused resources on a node + ComputeUserRequested userconfig.Compute `json:"compute_user_requested" yaml:"compute_user_requested"` // total resources requested by user on a node } type NodeInfo struct { diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 9f5c2f3b8b..5e460aa7af 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -64,57 +64,12 @@ const ( ) var ( - _asyncGatewayCPURequest = kresource.MustParse("100m") - _asyncGatewayMemRequest = kresource.MustParse("100Mi") - _statsdAddress = fmt.Sprintf("prometheus-statsd-exporter.%s:9125", consts.PrometheusNamespace) // each Inferentia chip requires 128 HugePages with each HugePage having a size of 2Mi _hugePagesMemPerInf = int64(128 * 2 * 1024 * 1024) // bytes ) -func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.VolumeMount) kcore.Container { - return kcore.Container{ - Name: GatewayContainerName, - Image: config.ClusterConfig.ImageAsyncGateway, - ImagePullPolicy: kcore.PullAlways, - Args: []string{ - "--cluster-config", consts.DefaultInClusterConfigPath, - "--port", s.Int32(consts.ProxyPortInt32), - "--queue", queueURL, - api.Name, - }, - Ports: []kcore.ContainerPort{ - {ContainerPort: consts.ProxyPortInt32}, - }, - Env: BaseEnvVars, - EnvFrom: BaseClusterEnvVars(), - Resources: kcore.ResourceRequirements{ - Requests: kcore.ResourceList{ - kcore.ResourceCPU: _asyncGatewayCPURequest, - kcore.ResourceMemory: _asyncGatewayMemRequest, - }, - }, - LivenessProbe: &kcore.Probe{ - Handler: kcore.Handler{ - HTTPGet: &kcore.HTTPGetAction{ - Path: "/healthz", - Port: intstr.FromInt(8888), - }, - }, - }, - ReadinessProbe: &kcore.Probe{ - Handler: kcore.Handler{ - HTTPGet: &kcore.HTTPGetAction{ - Path: "/healthz", - Port: intstr.FromInt(8888), - }, - }, - }, - VolumeMounts: volumeMounts, - } -} - func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container, kcore.Volume) { return kcore.Container{ Name: DequeuerContainerName,