diff --git a/cli/cluster/delete.go b/cli/cluster/delete.go index e81624f98d..47618b304b 100644 --- a/cli/cluster/delete.go +++ b/cli/cluster/delete.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/json" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/prompt" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -70,8 +71,7 @@ func getReadyRealtimeAPIReplicasOrNil(operatorConfig OperatorConfig, apiName str return nil } - totalReady := apiRes.Status.Updated.Ready + apiRes.Status.Stale.Ready - return &totalReady + return pointer.Int32(apiRes.Status.Ready) } func StopJob(operatorConfig OperatorConfig, kind userconfig.Kind, apiName string, jobID string) (schema.DeleteResponse, error) { diff --git a/cli/cluster/get.go b/cli/cluster/get.go index 47a24aa0a3..6d88e707b8 100644 --- a/cli/cluster/get.go +++ b/cli/cluster/get.go @@ -51,6 +51,20 @@ func GetAPI(operatorConfig OperatorConfig, apiName string) ([]schema.APIResponse return apiRes, nil } +func DescribeAPI(operatorConfig OperatorConfig, apiName string) ([]schema.APIResponse, error) { + httpRes, err := HTTPGet(operatorConfig, "/describe/"+apiName) + if err != nil { + return nil, err + } + + var apiRes []schema.APIResponse + if err = json.Unmarshal(httpRes, &apiRes); err != nil { + return nil, errors.Wrap(err, "/describe/"+apiName, string(httpRes)) + } + + return apiRes, nil +} + func GetAPIByID(operatorConfig OperatorConfig, apiName string, apiID string) ([]schema.APIResponse, error) { httpRes, err := HTTPGet(operatorConfig, "/get/"+apiName+"/"+apiID) if err != nil { diff --git a/cli/cmd/describe.go b/cli/cmd/describe.go new file mode 100644 index 0000000000..767045c5a2 --- /dev/null +++ b/cli/cmd/describe.go @@ -0,0 +1,113 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "fmt" + + "github.com/cortexlabs/cortex/cli/cluster" + "github.com/cortexlabs/cortex/cli/types/cliconfig" + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/exit" + "github.com/cortexlabs/cortex/pkg/lib/telemetry" + "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/spf13/cobra" +) + +const ( + _titleReplicaStatus = "replica status" + _titleReplicaCount = "replica count" +) + +var ( + _flagDescribeEnv string + _flagDescribeWatch bool +) + +func describeInit() { + _describeCmd.Flags().SortFlags = false + _describeCmd.Flags().StringVarP(&_flagDescribeEnv, "env", "e", "", "environment to use") + _describeCmd.Flags().BoolVarP(&_flagDescribeWatch, "watch", "w", false, "re-run the command every 2 seconds") +} + +var _describeCmd = &cobra.Command{ + Use: "describe [API_NAME]", + Short: "describe an api", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + apiName := args[0] + + var envName string + if wasFlagProvided(cmd, "env") { + envName = _flagDescribeEnv + } else { + var err error + envName, err = getEnvFromFlag("") + if err != nil { + telemetry.Event("cli.describe") + exit.Error(err) + } + } + + env, err := ReadOrConfigureEnv(envName) + if err != nil { + telemetry.Event("cli.describe") + exit.Error(err) + } + telemetry.Event("cli.describe", map[string]interface{}{"env_name": env.Name}) + + rerun(_flagDescribeWatch, func() (string, error) { + env, err := ReadOrConfigureEnv(envName) + if err != nil { + exit.Error(err) + } + + out, err := envStringIfNotSpecified(envName, cmd) + if err != nil { + return "", err + } + apiTable, err := describeAPI(env, apiName) + if err != nil { + return "", err + } + + return out + apiTable, nil + }) + }, +} + +func describeAPI(env cliconfig.Environment, apiName string) (string, error) { + apisRes, err := cluster.DescribeAPI(MustGetOperatorConfig(env.Name), apiName) + if err != nil { + return "", err + } + + if len(apisRes) == 0 { + exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find api %s", apiName))) + } + + apiRes := apisRes[0] + + switch apiRes.Metadata.Kind { + case userconfig.RealtimeAPIKind: + return realtimeDescribeAPITable(apiRes, env) + case userconfig.AsyncAPIKind: + return asyncDescribeAPITable(apiRes, env) + default: + return "", errors.ErrorUnexpected(fmt.Sprintf("encountered unexpected kind %s for api %s", apiRes.Spec.Kind, apiRes.Spec.Name)) + } +} diff --git a/cli/cmd/get.go b/cli/cmd/get.go index 1b11b984a0..c260d0c5e9 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -35,29 +35,28 @@ import ( libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/yaml" "github.com/spf13/cobra" ) const ( _titleEnvironment = "env" _titleRealtimeAPI = "realtime api" - _titleStatus = "status" + _titleAsyncAPI = "async api" + _titleLive = "live" _titleUpToDate = "up-to-date" - _titleStale = "stale" - _titleRequested = "requested" - _titleFailed = "failed" - _titleLastupdated = "last update" + _titleLastUpdated = "last update" ) var ( - _flagGetEnv string - _flagWatch bool + _flagGetEnv string + _flagGetWatch bool ) func getInit() { _getCmd.Flags().SortFlags = false _getCmd.Flags().StringVarP(&_flagGetEnv, "env", "e", "", "environment to use") - _getCmd.Flags().BoolVarP(&_flagWatch, "watch", "w", false, "re-run the command every 2 seconds") + _getCmd.Flags().BoolVarP(&_flagGetWatch, "watch", "w", false, "re-run the command every 2 seconds") _getCmd.Flags().VarP(&_flagOutput, "output", "o", fmt.Sprintf("output format: one of %s", strings.Join(flags.OutputTypeStringsExcluding(flags.YAMLOutputType), "|"))) addVerboseFlag(_getCmd) } @@ -90,7 +89,7 @@ var _getCmd = &cobra.Command{ telemetry.Event("cli.get") } - rerun(func() (string, error) { + rerun(_flagGetWatch, func() (string, error) { if len(args) == 1 { env, err := ReadOrConfigureEnv(envName) if err != nil { @@ -106,7 +105,7 @@ var _getCmd = &cobra.Command{ return "", err } - if _flagOutput == flags.JSONOutputType { + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return apiTable, nil } @@ -136,7 +135,7 @@ var _getCmd = &cobra.Command{ if err != nil { return "", err } - if _flagOutput == flags.JSONOutputType { + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return jobTable, nil } @@ -166,7 +165,7 @@ var _getCmd = &cobra.Command{ return "", err } - if _flagOutput == flags.JSONOutputType { + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return apiTable, nil } @@ -221,7 +220,7 @@ func getAPIsInAllEnvironments() (string, error) { if err == nil { for _, api := range apisRes { - switch api.Spec.Kind { + switch api.Metadata.Kind { case userconfig.BatchAPIKind: allBatchAPIEnvs = append(allBatchAPIEnvs, env.Name) allBatchAPIs = append(allBatchAPIs, api) @@ -247,12 +246,16 @@ func getAPIsInAllEnvironments() (string, error) { allAPIsOutput = append(allAPIsOutput, apisOutput) } + var bytes []byte if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(allAPIsOutput) - if err != nil { - return "", err - } - + bytes, err = libjson.Marshal(allAPIsOutput) + } else if _flagOutput == flags.YAMLOutputType { + bytes, err = yaml.Marshal(allAPIsOutput) + } + if err != nil { + return "", err + } + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return string(bytes), nil } @@ -337,11 +340,16 @@ func getAPIsByEnv(env cliconfig.Environment) (string, error) { return "", err } + var bytes []byte if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(apisRes) - if err != nil { - return "", err - } + bytes, err = libjson.Marshal(apisRes) + } else if _flagOutput == flags.YAMLOutputType { + bytes, err = yaml.Marshal(apisRes) + } + if err != nil { + return "", err + } + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return string(bytes), nil } @@ -457,16 +465,21 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) { return "", err } + var bytes []byte if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(apisRes) - if err != nil { - return "", err - } + bytes, err = libjson.Marshal(apisRes) + } else if _flagOutput == flags.YAMLOutputType { + bytes, err = yaml.Marshal(apisRes) + } + if err != nil { + return "", err + } + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return string(bytes), nil } if len(apisRes) == 0 { - exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find API %s", apiName))) + exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find api %s", apiName))) } apiRes := apisRes[0] diff --git a/cli/cmd/lib_apis.go b/cli/cmd/lib_apis.go new file mode 100644 index 0000000000..23514342dc --- /dev/null +++ b/cli/cmd/lib_apis.go @@ -0,0 +1,59 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "github.com/cortexlabs/cortex/pkg/lib/table" + "github.com/cortexlabs/cortex/pkg/types/status" +) + +func replicaCountTable(counts *status.ReplicaCounts) table.Table { + var rows [][]interface{} + for _, replicaCountType := range status.ReplicaCountTypes { + count := counts.GetCountBy(replicaCountType) + canBeHiddenIfZero := false + switch replicaCountType { + case status.ReplicaCountFailed: + canBeHiddenIfZero = true + case status.ReplicaCountKilled: + canBeHiddenIfZero = true + case status.ReplicaCountKilledOOM: + canBeHiddenIfZero = true + case status.ReplicaCountErrImagePull: + canBeHiddenIfZero = true + case status.ReplicaCountUnknown: + canBeHiddenIfZero = true + case status.ReplicaCountStalled: + canBeHiddenIfZero = true + } + if count == 0 && canBeHiddenIfZero { + continue + } + rows = append(rows, []interface{}{ + replicaCountType, + count, + }) + } + + return table.Table{ + Headers: []table.Header{ + {Title: _titleReplicaStatus, MinWidth: 32, MaxWidth: 32}, + {Title: _titleReplicaCount}, + }, + Rows: rows, + } +} diff --git a/cli/cmd/lib_async_apis.go b/cli/cmd/lib_async_apis.go index 114c88bca8..e2e4441003 100644 --- a/cli/cmd/lib_async_apis.go +++ b/cli/cmd/lib_async_apis.go @@ -17,26 +17,22 @@ limitations under the License. package cmd import ( + "fmt" "strings" "time" "github.com/cortexlabs/cortex/cli/types/cliconfig" "github.com/cortexlabs/cortex/pkg/lib/console" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/table" libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/schema" ) -const ( - _titleAsyncAPI = "async api" -) - func asyncAPITable(asyncAPI schema.APIResponse, env cliconfig.Environment) (string, error) { var out string t := asyncAPIsTable([]schema.APIResponse{asyncAPI}, []string{env.Name}) - t.FindHeaderByTitle(_titleEnvironment).Hidden = true - t.FindHeaderByTitle(_titleAsyncAPI).Hidden = true out += t.MustFormat() @@ -44,7 +40,9 @@ func asyncAPITable(asyncAPI schema.APIResponse, env cliconfig.Environment) (stri out += "\n" + console.Bold("metrics dashboard: ") + *asyncAPI.DashboardURL + "\n" } - out += "\n" + console.Bold("endpoint: ") + asyncAPI.Endpoint + "\n" + if asyncAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *asyncAPI.Endpoint + "\n" + } out += "\n" + apiHistoryTable(asyncAPI.APIVersions) @@ -57,39 +55,56 @@ func asyncAPITable(asyncAPI schema.APIResponse, env cliconfig.Environment) (stri return out, nil } +func asyncDescribeAPITable(asyncAPI schema.APIResponse, env cliconfig.Environment) (string, error) { + if asyncAPI.Metadata == nil { + return "", errors.ErrorUnexpected("missing metadata from operator response") + } + + if asyncAPI.Status == nil { + return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", asyncAPI.Metadata.Name)) + } + + t := asyncAPIsTable([]schema.APIResponse{asyncAPI}, []string{env.Name}) + out := t.MustFormat() + + if asyncAPI.DashboardURL != nil && *asyncAPI.DashboardURL != "" { + out += "\n" + console.Bold("metrics dashboard: ") + *asyncAPI.DashboardURL + "\n" + } + + if asyncAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *asyncAPI.Endpoint + "\n" + } + + t = replicaCountTable(asyncAPI.Status.ReplicaCounts) + out += "\n" + t.MustFormat() + + return out, nil +} + func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(asyncAPIs)) - var totalFailed int32 - var totalStale int32 - for i, asyncAPI := range asyncAPIs { - lastUpdated := time.Unix(asyncAPI.Spec.LastUpdated, 0) + if asyncAPI.Metadata == nil || asyncAPI.Status == nil { + continue + } + lastUpdated := time.Unix(asyncAPI.Metadata.LastUpdated, 0) rows = append(rows, []interface{}{ envNames[i], - asyncAPI.Spec.Name, - asyncAPI.Status.Message(), - asyncAPI.Status.Updated.Ready, - asyncAPI.Status.Stale.Ready, - asyncAPI.Status.Requested, - asyncAPI.Status.Updated.TotalFailed(), + asyncAPI.Metadata.Name, + fmt.Sprintf("%d/%d", asyncAPI.Status.Ready, asyncAPI.Status.Requested), + asyncAPI.Status.UpToDate, libtime.SinceStr(&lastUpdated), }) - - totalFailed += asyncAPI.Status.Updated.TotalFailed() - totalStale += asyncAPI.Status.Stale.Ready } return table.Table{ Headers: []table.Header{ {Title: _titleEnvironment}, {Title: _titleAsyncAPI}, - {Title: _titleStatus}, + {Title: _titleLive}, {Title: _titleUpToDate}, - {Title: _titleStale, Hidden: totalStale == 0}, - {Title: _titleRequested}, - {Title: _titleFailed, Hidden: totalFailed == 0}, - {Title: _titleLastupdated}, + {Title: _titleLastUpdated}, }, Rows: rows, } diff --git a/cli/cmd/lib_batch_apis.go b/cli/cmd/lib_batch_apis.go index 272dbfa0fa..ebabc29243 100644 --- a/cli/cmd/lib_batch_apis.go +++ b/cli/cmd/lib_batch_apis.go @@ -31,6 +31,7 @@ import ( libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/status" + "github.com/cortexlabs/yaml" ) const ( @@ -43,7 +44,10 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab rows := make([][]interface{}, 0, len(batchAPIs)) for i, batchAPI := range batchAPIs { - lastAPIUpdated := time.Unix(batchAPI.Spec.LastUpdated, 0) + if batchAPI.Metadata == nil { + continue + } + lastAPIUpdated := time.Unix(batchAPI.Metadata.LastUpdated, 0) latestStartTime := time.Time{} latestJobID := "-" runningJobs := 0 @@ -61,7 +65,7 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab rows = append(rows, []interface{}{ envNames[i], - batchAPI.Spec.Name, + batchAPI.Metadata.Name, runningJobs, latestJobID, libtime.SinceStr(&lastAPIUpdated), @@ -74,7 +78,7 @@ func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Tab {Title: _titleBatchAPI}, {Title: _titleJobCount}, {Title: _titleLatestJobID}, - {Title: _titleLastupdated}, + {Title: _titleLastUpdated}, }, Rows: rows, } @@ -123,7 +127,9 @@ func batchAPITable(batchAPI schema.APIResponse) string { out += "\n" + console.Bold("metrics dashboard: ") + *batchAPI.DashboardURL + "\n" } - out += "\n" + console.Bold("endpoint: ") + batchAPI.Endpoint + "\n" + if batchAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *batchAPI.Endpoint + "\n" + } out += "\n" + apiHistoryTable(batchAPI.APIVersions) @@ -142,11 +148,16 @@ func getBatchJob(env cliconfig.Environment, apiName string, jobID string) (strin return "", err } + var bytes []byte if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(resp) - if err != nil { - return "", err - } + bytes, err = libjson.Marshal(resp) + } else if _flagOutput == flags.YAMLOutputType { + bytes, err = yaml.Marshal(resp) + } + if err != nil { + return "", err + } + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return string(bytes), nil } @@ -216,22 +227,34 @@ func getBatchJob(env cliconfig.Environment, apiName string, jobID string) (strin if job.WorkerCounts != nil { t := table.Table{ Headers: []table.Header{ - {Title: "requested"}, - {Title: "pending", Hidden: job.WorkerCounts.Pending == 0}, - {Title: "initializing", Hidden: job.WorkerCounts.Initializing == 0}, - {Title: "stalled", Hidden: job.WorkerCounts.Stalled == 0}, - {Title: "running"}, - {Title: "failed", Hidden: job.WorkerCounts.Failed == 0}, - {Title: "succeeded"}, + {Title: "Requested"}, + {Title: "Pending"}, + {Title: "Creating"}, + {Title: "Ready"}, + {Title: "NotReady"}, + {Title: "ErrImagePull", Hidden: job.WorkerCounts.ErrImagePull == 0}, + {Title: "Terminating", Hidden: job.WorkerCounts.Terminating == 0}, + {Title: "Failed", Hidden: job.WorkerCounts.Failed == 0}, + {Title: "Killed", Hidden: job.WorkerCounts.Killed == 0}, + {Title: "KilledOOM", Hidden: job.WorkerCounts.KilledOOM == 0}, + {Title: "Stalled", Hidden: job.WorkerCounts.Stalled == 0}, + {Title: "Unknown", Hidden: job.WorkerCounts.Unknown == 0}, + {Title: "Succeeded"}, }, Rows: [][]interface{}{ { job.Workers, job.WorkerCounts.Pending, - job.WorkerCounts.Initializing, - job.WorkerCounts.Stalled, - job.WorkerCounts.Running, + job.WorkerCounts.Creating, + job.WorkerCounts.Ready, + job.WorkerCounts.NotReady, + job.WorkerCounts.ErrImagePull, + job.WorkerCounts.Terminating, job.WorkerCounts.Failed, + job.WorkerCounts.Killed, + job.WorkerCounts.KilledOOM, + job.WorkerCounts.Stalled, + job.WorkerCounts.Unknown, job.WorkerCounts.Succeeded, }, }, diff --git a/cli/cmd/lib_realtime_apis.go b/cli/cmd/lib_realtime_apis.go index be4316e0a8..dd73db1282 100644 --- a/cli/cmd/lib_realtime_apis.go +++ b/cli/cmd/lib_realtime_apis.go @@ -17,11 +17,13 @@ limitations under the License. package cmd import ( + "fmt" "strings" "time" "github.com/cortexlabs/cortex/cli/types/cliconfig" "github.com/cortexlabs/cortex/pkg/lib/console" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/table" libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -31,16 +33,15 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) var out string t := realtimeAPIsTable([]schema.APIResponse{realtimeAPI}, []string{env.Name}) - t.FindHeaderByTitle(_titleEnvironment).Hidden = true - t.FindHeaderByTitle(_titleRealtimeAPI).Hidden = true - out += t.MustFormat() if realtimeAPI.DashboardURL != nil && *realtimeAPI.DashboardURL != "" { out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n" } - out += "\n" + console.Bold("endpoint: ") + realtimeAPI.Endpoint + "\n" + if realtimeAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *realtimeAPI.Endpoint + "\n" + } out += "\n" + apiHistoryTable(realtimeAPI.APIVersions) @@ -53,39 +54,56 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) return out, nil } +func realtimeDescribeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) (string, error) { + if realtimeAPI.Metadata == nil { + return "", errors.ErrorUnexpected("missing metadata from operator response") + } + + if realtimeAPI.Status == nil { + return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", realtimeAPI.Metadata.Name)) + } + + t := realtimeAPIsTable([]schema.APIResponse{realtimeAPI}, []string{env.Name}) + out := t.MustFormat() + + if realtimeAPI.DashboardURL != nil && *realtimeAPI.DashboardURL != "" { + out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n" + } + + if realtimeAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *realtimeAPI.Endpoint + "\n" + } + + t = replicaCountTable(realtimeAPI.Status.ReplicaCounts) + out += "\n" + t.MustFormat() + + return out, nil +} + func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(realtimeAPIs)) - var totalFailed int32 - var totalStale int32 - for i, realtimeAPI := range realtimeAPIs { - lastUpdated := time.Unix(realtimeAPI.Spec.LastUpdated, 0) + if realtimeAPI.Metadata == nil || realtimeAPI.Status == nil { + continue + } + lastUpdated := time.Unix(realtimeAPI.Metadata.LastUpdated, 0) rows = append(rows, []interface{}{ envNames[i], - realtimeAPI.Spec.Name, - realtimeAPI.Status.Message(), - realtimeAPI.Status.Updated.Ready, - realtimeAPI.Status.Stale.Ready, - realtimeAPI.Status.Requested, - realtimeAPI.Status.Updated.TotalFailed(), + realtimeAPI.Metadata.Name, + fmt.Sprintf("%d/%d", realtimeAPI.Status.Ready, realtimeAPI.Status.Requested), + realtimeAPI.Status.UpToDate, libtime.SinceStr(&lastUpdated), }) - - totalFailed += realtimeAPI.Status.Updated.TotalFailed() - totalStale += realtimeAPI.Status.Stale.Ready } return table.Table{ Headers: []table.Header{ {Title: _titleEnvironment}, {Title: _titleRealtimeAPI}, - {Title: _titleStatus}, + {Title: _titleLive}, {Title: _titleUpToDate}, - {Title: _titleStale, Hidden: totalStale == 0}, - {Title: _titleRequested}, - {Title: _titleFailed, Hidden: totalFailed == 0}, - {Title: _titleLastupdated}, + {Title: _titleLastUpdated}, }, Rows: rows, } diff --git a/cli/cmd/lib_task_apis.go b/cli/cmd/lib_task_apis.go index 50575b8516..295e1af875 100644 --- a/cli/cmd/lib_task_apis.go +++ b/cli/cmd/lib_task_apis.go @@ -29,6 +29,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/table" libtime "github.com/cortexlabs/cortex/pkg/lib/time" "github.com/cortexlabs/cortex/pkg/operator/schema" + "github.com/cortexlabs/yaml" ) const ( @@ -41,7 +42,10 @@ func taskAPIsTable(taskAPIs []schema.APIResponse, envNames []string) table.Table rows := make([][]interface{}, 0, len(taskAPIs)) for i, taskAPI := range taskAPIs { - lastAPIUpdated := time.Unix(taskAPI.Spec.LastUpdated, 0) + if taskAPI.Metadata == nil { + continue + } + lastAPIUpdated := time.Unix(taskAPI.Metadata.LastUpdated, 0) latestStartTime := time.Time{} latestJobID := "-" runningJobs := 0 @@ -59,7 +63,7 @@ func taskAPIsTable(taskAPIs []schema.APIResponse, envNames []string) table.Table rows = append(rows, []interface{}{ envNames[i], - taskAPI.Spec.Name, + taskAPI.Metadata.Name, runningJobs, latestJobID, libtime.SinceStr(&lastAPIUpdated), @@ -72,7 +76,7 @@ func taskAPIsTable(taskAPIs []schema.APIResponse, envNames []string) table.Table {Title: _titleTaskAPI}, {Title: _titleTaskJobCount}, {Title: _titleLatestTaskJobID}, - {Title: _titleLastupdated}, + {Title: _titleLastUpdated}, }, Rows: rows, } @@ -118,7 +122,9 @@ func taskAPITable(taskAPI schema.APIResponse) string { out += "\n" + console.Bold("metrics dashboard: ") + *taskAPI.DashboardURL + "\n" } - out += "\n" + console.Bold("endpoint: ") + taskAPI.Endpoint + "\n" + if taskAPI.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *taskAPI.Endpoint + "\n" + } out += "\n" + apiHistoryTable(taskAPI.APIVersions) @@ -137,11 +143,16 @@ func getTaskJob(env cliconfig.Environment, apiName string, jobID string) (string return "", err } + var bytes []byte if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(resp) - if err != nil { - return "", err - } + bytes, err = libjson.Marshal(resp) + } else if _flagOutput == flags.YAMLOutputType { + bytes, err = yaml.Marshal(resp) + } + if err != nil { + return "", err + } + if _flagOutput == flags.JSONOutputType || _flagOutput == flags.YAMLOutputType { return string(bytes), nil } @@ -176,22 +187,34 @@ func getTaskJob(env cliconfig.Environment, apiName string, jobID string) (string if job.WorkerCounts != nil { t := table.Table{ Headers: []table.Header{ - {Title: "requested"}, - {Title: "pending", Hidden: job.WorkerCounts.Pending == 0}, - {Title: "initializing", Hidden: job.WorkerCounts.Initializing == 0}, - {Title: "stalled", Hidden: job.WorkerCounts.Stalled == 0}, - {Title: "running"}, - {Title: "failed", Hidden: job.WorkerCounts.Failed == 0}, - {Title: "succeeded"}, + {Title: "Requested"}, + {Title: "Pending"}, + {Title: "Creating"}, + {Title: "Ready"}, + {Title: "NotReady"}, + {Title: "ErrImagePull", Hidden: job.WorkerCounts.ErrImagePull == 0}, + {Title: "Terminating", Hidden: job.WorkerCounts.Terminating == 0}, + {Title: "Failed", Hidden: job.WorkerCounts.Failed == 0}, + {Title: "Killed", Hidden: job.WorkerCounts.Killed == 0}, + {Title: "KilledOOM", Hidden: job.WorkerCounts.KilledOOM == 0}, + {Title: "Stalled", Hidden: job.WorkerCounts.Stalled == 0}, + {Title: "Unknown", Hidden: job.WorkerCounts.Unknown == 0}, + {Title: "Succeeded"}, }, Rows: [][]interface{}{ { job.Workers, job.WorkerCounts.Pending, - job.WorkerCounts.Initializing, - job.WorkerCounts.Stalled, - job.WorkerCounts.Running, + job.WorkerCounts.Creating, + job.WorkerCounts.Ready, + job.WorkerCounts.NotReady, + job.WorkerCounts.ErrImagePull, + job.WorkerCounts.Terminating, job.WorkerCounts.Failed, + job.WorkerCounts.Killed, + job.WorkerCounts.KilledOOM, + job.WorkerCounts.Stalled, + job.WorkerCounts.Unknown, job.WorkerCounts.Succeeded, }, }, diff --git a/cli/cmd/lib_traffic_splitters.go b/cli/cmd/lib_traffic_splitters.go index 39c344038a..af2b4e4aad 100644 --- a/cli/cmd/lib_traffic_splitters.go +++ b/cli/cmd/lib_traffic_splitters.go @@ -17,6 +17,7 @@ limitations under the License. package cmd import ( + "fmt" "strings" "time" @@ -44,12 +45,14 @@ func trafficSplitterTable(trafficSplitter schema.APIResponse, env cliconfig.Envi if err != nil { return "", err } - t.FindHeaderByTitle(_titleEnvironment).Hidden = true out += t.MustFormat() out += "\n" + console.Bold("last updated: ") + libtime.SinceStr(&lastUpdated) - out += "\n" + console.Bold("endpoint: ") + trafficSplitter.Endpoint + "\n" + + if trafficSplitter.Endpoint != nil { + out += "\n" + console.Bold("endpoint: ") + *trafficSplitter.Endpoint + "\n" + } out += "\n" + apiHistoryTable(trafficSplitter.APIVersions) @@ -72,7 +75,10 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ } apiRes := apisRes[0] - lastUpdated := time.Unix(apiRes.Spec.LastUpdated, 0) + if apiRes.Metadata == nil || apiRes.Status == nil { + continue + } + lastUpdated := time.Unix(apiRes.Metadata.LastUpdated, 0) apiName := apiRes.Spec.Name if api.Shadow { @@ -82,8 +88,8 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ env.Name, apiName, api.Weight, - apiRes.Status.Message(), - apiRes.Status.Requested, + fmt.Sprintf("%d/%d", apiRes.Status.Ready, apiRes.Status.Requested), + apiRes.Status.UpToDate, libtime.SinceStr(&lastUpdated), }) } @@ -93,9 +99,9 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ {Title: _titleEnvironment}, {Title: _titleAPIs}, {Title: _trafficSplitterWeights}, - {Title: _titleStatus}, - {Title: _titleRequested}, - {Title: _titleLastupdated}, + {Title: _titleLive}, + {Title: _titleUpToDate}, + {Title: _titleLastUpdated}, }, Rows: rows, }, nil @@ -104,20 +110,14 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(trafficSplitter)) for i, splitAPI := range trafficSplitter { - lastUpdated := time.Unix(splitAPI.Spec.LastUpdated, 0) - var apis []string - for _, api := range splitAPI.Spec.APIs { - apiName := api.Name - if api.Shadow { - apiName += " (shadow)" - } - apis = append(apis, apiName+":"+s.Int32(api.Weight)) + if splitAPI.Metadata == nil || splitAPI.NumTrafficSplitterTargets == nil { + continue } - apisStr := s.TruncateEllipses(strings.Join(apis, " "), 50) + lastUpdated := time.Unix(splitAPI.Metadata.LastUpdated, 0) rows = append(rows, []interface{}{ envNames[i], - splitAPI.Spec.Name, - apisStr, + splitAPI.Metadata.Name, + s.Int32(*splitAPI.NumTrafficSplitterTargets), libtime.SinceStr(&lastUpdated), }) } @@ -127,7 +127,7 @@ func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []s {Title: _titleEnvironment}, {Title: _titleTrafficSplitter}, {Title: _titleAPIs}, - {Title: _titleLastupdated}, + {Title: _titleLastUpdated}, }, Rows: rows, } diff --git a/cli/cmd/lib_watch.go b/cli/cmd/lib_watch.go index 06aebb26c2..a0f9043492 100644 --- a/cli/cmd/lib_watch.go +++ b/cli/cmd/lib_watch.go @@ -56,8 +56,8 @@ func watchHeader() string { return fmt.Sprintf("$ %s %s%s", _cmdStr, padding, libtime.LocalHourNow()) } -func rerun(f func() (string, error)) { - if _flagWatch { +func rerun(watchFlag bool, f func() (string, error)) { + if watchFlag { print("\033[H\033[2J") // clear the screen var prevStrSlice []string diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 68649c0cc1..8aa7d1e0e0 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -112,6 +112,7 @@ func init() { clusterInit() completionInit() deleteInit() + describeInit() deployInit() envInit() getInit() @@ -154,6 +155,7 @@ func Execute() { _rootCmd.AddCommand(_deployCmd) _rootCmd.AddCommand(_getCmd) + _rootCmd.AddCommand(_describeCmd) _rootCmd.AddCommand(_logsCmd) _rootCmd.AddCommand(_refreshCmd) _rootCmd.AddCommand(_deleteCmd) diff --git a/cmd/operator/main.go b/cmd/operator/main.go index bf5a50d33b..ac38ee7130 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -105,6 +105,7 @@ func main() { routerWithAuth.HandleFunc("/get", endpoints.GetAPIs).Methods("GET") routerWithAuth.HandleFunc("/get/{apiName}", endpoints.GetAPI).Methods("GET") routerWithAuth.HandleFunc("/get/{apiName}/{apiID}", endpoints.GetAPIByID).Methods("GET") + routerWithAuth.HandleFunc("/describe/{apiName}", endpoints.DescribeAPI).Methods("GET") routerWithAuth.HandleFunc("/streamlogs/{apiName}", endpoints.ReadLogs) routerWithAuth.HandleFunc("/logs/{apiName}", endpoints.GetLogURL).Methods("GET") diff --git a/dev/generate_cli_md.sh b/dev/generate_cli_md.sh index 5715f6fdb8..fdf2566624 100755 --- a/dev/generate_cli_md.sh +++ b/dev/generate_cli_md.sh @@ -33,6 +33,7 @@ echo "# CLI commands" >> $out_file commands=( "deploy" "get" + "describe" "logs" "refresh" "delete" diff --git a/docs/clients/cli.md b/docs/clients/cli.md index be43886dba..b10957bfe4 100644 --- a/docs/clients/cli.md +++ b/docs/clients/cli.md @@ -32,6 +32,20 @@ Flags: -h, --help help for get ``` +## describe + +```text +describe an api + +Usage: + cortex describe [API_NAME] [flags] + +Flags: + -e, --env string environment to use + -w, --watch re-run the command every 2 seconds + -h, --help help for describe +``` + ## logs ```text diff --git a/docs/workloads/async/statuses.md b/docs/workloads/async/statuses.md index 3ecaeba865..9c4787f293 100644 --- a/docs/workloads/async/statuses.md +++ b/docs/workloads/async/statuses.md @@ -1,4 +1,4 @@ -# Statuses +# Request statuses | Status | Meaning | | :--- | :--- | @@ -6,3 +6,22 @@ | in_progress | Workload has been pulled by the API and is currently being processed | | completed | Workload has completed with success | | failed | Workload encountered an error during processing | + +# Replica states + +The replica states of an API can be inspected by running `cortex describe <api-name>`. Here are the possible states for each replica in an API: + +| State | Meaning | +|:---|:---| +| Ready | Replica is running and it has passed the readiness checks | +| ReadyOutOfDate | Replica is running and it has passed the readiness checks (for an out-of-date replica) | +| NotReady | Replica is running but it's not passing the readiness checks; make sure the server is listening on the designed port of the API | +| Pending | Replica is in a pending state (waiting to get scheduled onto a node) | +| Creating | Replica is in the process of having its containers created | +| ErrImagePull | Replica was not created because one of the specified Docker images was inaccessible at runtime; check that your API's docker images exist and are accessible via your cluster's AWS credentials | +| Failed | Replica couldn't start due to an error; run `cortex logs <name>` to view the logs | +| Killed | Replica has had one of its containers killed | +| KilledOOM | Replica was terminated due to excessive memory usage; try allocating more memory to the API and re-deploy | +| Stalled | Replica has been in a pending state for more than 15 minutes; see [troubleshooting](../realtime/troubleshooting.md) | +| Terminating | Replica is currently in the process of being terminated | +| Unknown | Replica is in an unknown state | diff --git a/docs/workloads/batch/statuses.md b/docs/workloads/batch/statuses.md index 1bcddcd6bd..019ca55789 100644 --- a/docs/workloads/batch/statuses.md +++ b/docs/workloads/batch/statuses.md @@ -1,4 +1,4 @@ -# Statuses +# Job statuses | Status | Meaning | | :--- | :--- | diff --git a/docs/workloads/realtime/statuses.md b/docs/workloads/realtime/statuses.md index 2ee32aca40..d4e201bfba 100644 --- a/docs/workloads/realtime/statuses.md +++ b/docs/workloads/realtime/statuses.md @@ -1,10 +1,18 @@ -# Statuses +# Replica states -| Status | Meaning | -| :--- | :--- | -| live | API is deployed and ready to serve requests (at least one replica is running) | -| updating | API is updating | -| error | API was not created due to an error; run `cortex logs <name>` to view the logs | -| error (image pull) | API was not created because one of the specified Docker images was inaccessible at runtime; check that your API's docker images exist and are accessible via your cluster's AWS credentials | -| error (out of memory) | API was terminated due to excessive memory usage; try allocating more memory to the API and re-deploying | -| compute unavailable | API could not start due to insufficient memory, CPU, GPU, or Inf in the cluster; some replicas may be ready | +The replica states of an API can be inspected by running `cortex describe <api-name>`. Here are the possible states for each replica in an API: + +| State | Meaning | +|:---|:---| +| Ready | Replica is running and it has passed the readiness checks | +| ReadyOutOfDate | Replica is running and it has passed the readiness checks (for an out-of-date replica) | +| NotReady | Replica is running but it's not passing the readiness checks; make sure the server is listening on the designed port of the API | +| Pending | Replica is in a pending state (waiting to get scheduled onto a node) | +| Creating | Replica is in the process of having its containers created | +| ErrImagePull | Replica was not created because one of the specified Docker images was inaccessible at runtime; check that your API's docker images exist and are accessible via your cluster's AWS credentials | +| Failed | Replica couldn't start due to an error; run `cortex logs <name>` to view the logs | +| Killed | Replica has had one of its containers killed | +| KilledOOM | Replica was terminated due to excessive memory usage; try allocating more memory to the API and re-deploy | +| Stalled | Replica has been in a pending state for more than 15 minutes; see [troubleshooting](../realtime/troubleshooting.md) | +| Terminating | Replica is currently in the process of being terminated | +| Unknown | Replica is in an unknown state | diff --git a/docs/workloads/realtime/troubleshooting.md b/docs/workloads/realtime/troubleshooting.md index 61de9dfe74..5254d25aaa 100644 --- a/docs/workloads/realtime/troubleshooting.md +++ b/docs/workloads/realtime/troubleshooting.md @@ -4,14 +4,14 @@ When making requests to your API, it's possible to get a `no healthy upstream` error message (with HTTP status code `503`). This means that there are currently no live replicas running for your API. This could happen for a few reasons: -1. It's possible that your API is simply not ready yet. You can check the status of your API with `cortex get API_NAME`, and inspect the logs in CloudWatch with the help of `cortex logs API_NAME`. -1. Your API may have errored during initialization or while responding to a previous request. `cortex get API_NAME` will show the status of your API, and you can view the logs for all replicas by visiting the CloudWatch Insights URL from `cortex logs API_NAME`. +1. It's possible that your API is simply not ready yet. You can check the number of ready replicas on your API with `cortex get API_NAME`, and inspect the logs in CloudWatch with the help of `cortex logs API_NAME`. +1. Your API may have errored during initialization or while responding to a previous request. `cortex describe API_NAME` will show the number of replicas that have failed to start on your API, and you can view the logs for all replicas by visiting the CloudWatch Insights URL from `cortex logs API_NAME`. If you are using API Gateway in front of your API endpoints, it is also possible to receive a `{"message":"Service Unavailable"}` error message (with HTTP status code `503`) after 29 seconds if your request exceeds API Gateway's 29 second timeout. If this is the case, you can either modify your code to take less time, run on faster hardware (e.g. GPUs), or don't use API Gateway (there is no timeout when using the API's endpoint directly). ## API is stuck updating -If your API is stuck in the "updating" or "compute unavailable" state (which is displayed when running `cortex get`), there are a few possible causes. Here are some things to check: +If your API has pods stuck in the "pending" or "stalled" states (which is displayed when running `cortex describe API_NAME`), there are a few possible causes. Here are some things to check: ### Inspect API logs in CloudWatch diff --git a/docs/workloads/task/statuses.md b/docs/workloads/task/statuses.md index b51eaf010f..0631ab68f2 100644 --- a/docs/workloads/task/statuses.md +++ b/docs/workloads/task/statuses.md @@ -1,4 +1,4 @@ -# Statuses +# Job statuses | Status | Meaning | | :--- | :--- | diff --git a/go.mod b/go.mod index c199f3dc4c..4381c8a46b 100644 --- a/go.mod +++ b/go.mod @@ -67,7 +67,7 @@ require ( golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/genproto v0.0.0-20210701133433-6b8dcf568a95 // indirect - google.golang.org/grpc v1.39.0 // indirect + google.golang.org/grpc v1.39.0 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 gopkg.in/segmentio/analytics-go.v3 v3.1.0 diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go index b7c54adc3d..7b68736951 100644 --- a/pkg/activator/activator.go +++ b/pkg/activator/activator.go @@ -131,7 +131,7 @@ func (a *activator) getOrCreateAPIActivator(ctx context.Context, apiName string) return nil, errors.WithStack(err) } - maxQueueLength, maxConcurrency, err := concurrencyFromAnnotations(vs.Annotations) + maxQueueLength, maxConcurrency, err := userconfig.ConcurrencyFromAnnotations(vs) if err != nil { return nil, err } diff --git a/pkg/activator/helpers.go b/pkg/activator/helpers.go index f32c7e54f2..5bce2cb7bf 100644 --- a/pkg/activator/helpers.go +++ b/pkg/activator/helpers.go @@ -17,8 +17,6 @@ limitations under the License. package activator import ( - "strconv" - "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/types/userconfig" "k8s.io/apimachinery/pkg/api/meta" @@ -50,8 +48,7 @@ func getAPIMeta(obj interface{}) (apiMeta, error) { return apiMeta{}, errors.ErrorUnexpected("got a virtual service without apiName label") } - annotations := resource.GetAnnotations() - maxQueueLength, maxConcurrency, err := concurrencyFromAnnotations(annotations) + maxQueueLength, maxConcurrency, err := userconfig.ConcurrencyFromAnnotations(resource) if err != nil { return apiMeta{}, err } @@ -60,22 +57,8 @@ func getAPIMeta(obj interface{}) (apiMeta, error) { apiName: apiName, apiKind: userconfig.KindFromString(apiKind), labels: labels, - annotations: annotations, + annotations: resource.GetAnnotations(), maxConcurrency: maxConcurrency, maxQueueLength: maxQueueLength, }, nil } - -func concurrencyFromAnnotations(annotations map[string]string) (int, int, error) { - maxQueueLength, err := strconv.Atoi(annotations[userconfig.MaxQueueLengthAnnotationKey]) - if err != nil { - return 0, 0, errors.ErrorUnexpected("failed to parse annotation", userconfig.MaxQueueLengthAnnotationKey) - } - - maxConcurrency, err := strconv.Atoi(annotations[userconfig.MaxConcurrencyAnnotationKey]) - if err != nil { - return 0, 0, errors.ErrorUnexpected("failed to parse annotation", userconfig.MaxConcurrencyAnnotationKey) - } - - return maxQueueLength, maxConcurrency, err -} diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 7ea590fc45..3fe860d776 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -76,8 +76,7 @@ var ( CortexProbeHeader = "X-Cortex-Probe" CortexOriginHeader = "X-Cortex-Origin" - WaitForInitializingReplicasTimeout = 15 * time.Minute - WaitForReadyReplicasTimeout = 20 * time.Minute + WaitForReadyReplicasTimeout = 20 * time.Minute ) func DefaultRegistry() string { diff --git a/pkg/crds/config/crd/bases/batch.cortex.dev_batchjobs.yaml b/pkg/crds/config/crd/bases/batch.cortex.dev_batchjobs.yaml index 63b1987bd9..a60ccbba4a 100644 --- a/pkg/crds/config/crd/bases/batch.cortex.dev_batchjobs.yaml +++ b/pkg/crds/config/crd/bases/batch.cortex.dev_batchjobs.yaml @@ -251,16 +251,28 @@ spec: worker_counts: description: Detailed worker counts with respective status properties: + creating: + format: int32 + type: integer + err_image_pull: + format: int32 + type: integer failed: format: int32 type: integer - initializing: + killed: + format: int32 + type: integer + killed_oom: + format: int32 + type: integer + not_ready: format: int32 type: integer pending: format: int32 type: integer - running: + ready: format: int32 type: integer stalled: @@ -269,6 +281,9 @@ spec: succeeded: format: int32 type: integer + terminating: + format: int32 + type: integer unknown: format: int32 type: integer diff --git a/pkg/crds/controllers/batch/batchjob_controller_helpers.go b/pkg/crds/controllers/batch/batchjob_controller_helpers.go index dd052dfc58..0f11ba67ba 100644 --- a/pkg/crds/controllers/batch/batchjob_controller_helpers.go +++ b/pkg/crds/controllers/batch/batchjob_controller_helpers.go @@ -442,6 +442,22 @@ func (r *BatchJobReconciler) getWorkerJob(ctx context.Context, batchJob batch.Ba return &job, nil } +func (r *BatchJobReconciler) getWorkerJobPods(ctx context.Context, batchJob batch.BatchJob) ([]kcore.Pod, error) { + workerJobPods := kcore.PodList{} + if err := r.List(ctx, &workerJobPods, + client.InNamespace(consts.DefaultNamespace), + client.MatchingLabels{ + "jobID": batchJob.Name, + "apiName": batchJob.Spec.APIName, + "apiID": batchJob.Spec.APIID, + "cortex.dev/batch": "worker", + }, + ); err != nil { + return nil, err + } + return workerJobPods.Items, nil +} + func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.BatchJob, statusInfo batchJobStatusInfo) error { batchJob.Status.ID = batchJob.Name @@ -461,6 +477,11 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B batchJob.Status.TotalBatchCount = statusInfo.TotalBatchCount } + workerJobPods, err := r.getWorkerJobPods(ctx, *batchJob) + if err != nil { + return errors.Wrap(err, "failed to retrieve worker pods") + } + worker := statusInfo.WorkerJob if worker != nil { batchJob.Status.EndTime = worker.Status.CompletionTime // assign right away, because it's a pointer @@ -486,13 +507,11 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B } } - isWorkerOOM, err := r.checkWorkersOOM(ctx, batchJob) - if err != nil { - return err - } - - if isWorkerOOM { - batchJobStatus = status.JobWorkerOOM + for i := range workerJobPods { + if k8s.WasPodOOMKilled(&workerJobPods[i]) { + batchJobStatus = status.JobWorkerOOM + break + } } batchJob.Status.Status = batchJobStatus @@ -512,11 +531,8 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B batchJob.Status.Status = status.JobRunning } - batchJob.Status.WorkerCounts = &status.WorkerCounts{ - Running: worker.Status.Active, - Succeeded: worker.Status.Succeeded, - Failed: worker.Status.Failed, - } + workerCounts := getReplicaCounts(workerJobPods) + batchJob.Status.WorkerCounts = &workerCounts } if err := r.Status().Update(ctx, batchJob); err != nil { @@ -526,27 +542,6 @@ func (r *BatchJobReconciler) updateStatus(ctx context.Context, batchJob *batch.B return nil } -func (r *BatchJobReconciler) checkWorkersOOM(ctx context.Context, batchJob *batch.BatchJob) (bool, error) { - workerJobPods := kcore.PodList{} - if err := r.List(ctx, &workerJobPods, - client.InNamespace(consts.DefaultNamespace), - client.MatchingLabels{ - "jobID": batchJob.Name, - "apiName": batchJob.Spec.APIName, - "apiID": batchJob.Spec.APIID, - }, - ); err != nil { - return false, err - } - - for i := range workerJobPods.Items { - if k8s.WasPodOOMKilled(&workerJobPods.Items[i]) { - return true, nil - } - } - return false, nil -} - func (r *BatchJobReconciler) deleteSQSQueue(batchJob batch.BatchJob) error { queueURL := r.getQueueURL(batchJob) input := sqs.DeleteQueueInput{QueueUrl: aws.String(queueURL)} @@ -736,3 +731,34 @@ func saveJobStatus(r *BatchJobReconciler, batchJob batch.BatchJob) error { }, ) } + +func getReplicaCounts(workerJobPods []kcore.Pod) status.WorkerCounts { + workerCounts := status.WorkerCounts{} + for i := range workerJobPods { + switch k8s.GetPodStatus(&workerJobPods[i]) { + case k8s.PodStatusPending: + workerCounts.Pending++ + case k8s.PodStatusStalled: + workerCounts.Stalled++ + case k8s.PodStatusCreating: + workerCounts.Creating++ + case k8s.PodStatusNotReady: + workerCounts.NotReady++ + case k8s.PodStatusErrImagePull: + workerCounts.ErrImagePull++ + case k8s.PodStatusTerminating: + workerCounts.Terminating++ + case k8s.PodStatusFailed: + workerCounts.Failed++ + case k8s.PodStatusKilled: + workerCounts.Killed++ + case k8s.PodStatusKilledOOM: + workerCounts.KilledOOM++ + case k8s.PodStatusSucceeded: + workerCounts.Succeeded++ + case k8s.PodStatusUnknown: + workerCounts.Unknown++ + } + } + return workerCounts +} diff --git a/pkg/lib/k8s/pod.go b/pkg/lib/k8s/pod.go index e841a7b8a8..293e88a476 100644 --- a/pkg/lib/k8s/pod.go +++ b/pkg/lib/k8s/pod.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" kcore "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -45,37 +46,48 @@ const ( ReasonCompleted = "Completed" ) +type PodSpec struct { + Name string + K8sPodSpec kcore.PodSpec + Labels map[string]string + Annotations map[string]string +} + type PodStatus string const ( - PodStatusUnknown PodStatus = "Unknown" PodStatusPending PodStatus = "Pending" - PodStatusInitializing PodStatus = "Initializing" - PodStatusRunning PodStatus = "Running" - PodStatusErrImagePull PodStatus = "Image pull error" + PodStatusCreating PodStatus = "Creating" + PodStatusNotReady PodStatus = "NotReady" + PodStatusReady PodStatus = "Ready" + PodStatusErrImagePull PodStatus = "ErrImagePull" PodStatusTerminating PodStatus = "Terminating" - PodStatusSucceeded PodStatus = "Succeeded" PodStatusFailed PodStatus = "Failed" PodStatusKilled PodStatus = "Killed" - PodStatusKilledOOM PodStatus = "Out of Memory" + PodStatusKilledOOM PodStatus = "KilledOOM" + PodStatusStalled PodStatus = "Stalled" + PodStatusSucceeded PodStatus = "Succeeded" + PodStatusUnknown PodStatus = "Unknown" ) -var _killStatuses = map[int32]bool{ - 137: true, // SIGKILL - 143: true, // SIGTERM - 130: true, // SIGINT - 129: true, // SIGHUP -} +var ( + _killStatuses = map[int32]bool{ + 137: true, // SIGKILL + 143: true, // SIGTERM + 130: true, // SIGINT + 129: true, // SIGHUP + } -// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/images/types.go#L27 -var _imagePullErrorStrings = strset.New("ErrImagePull", "ImagePullBackOff", "RegistryUnavailable") + _evictedMemoryMessageRegex = regexp.MustCompile(`(?i)low\W+on\W+resource\W+memory`) -type PodSpec struct { - Name string - K8sPodSpec kcore.PodSpec - Labels map[string]string - Annotations map[string]string -} + // https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/images/types.go#L27 + _imagePullErrorStrings = strset.New("ErrImagePull", "ImagePullBackOff", "RegistryUnavailable") + + // https://github.com/kubernetes/kubernetes/blob/9f47110aa29094ed2878cf1d85874cb59214664a/staging/src/k8s.io/api/core/v1/types.go#L76-L77 + _creatingReasons = strset.New("ContainerCreating", "PodInitializing") + + _waitForCreatingPodTimeout = time.Minute * 15 +) func Pod(spec *PodSpec) *kcore.Pod { pod := &kcore.Pod{ @@ -90,6 +102,28 @@ func Pod(spec *PodSpec) *kcore.Pod { return pod } +func GetPodConditionOf(pod *kcore.Pod, podType kcore.PodConditionType) (*bool, *kcore.PodCondition) { + if pod == nil { + return nil, nil + } + + var conditionState *bool + var condition *kcore.PodCondition + for i := range pod.Status.Conditions { + if pod.Status.Conditions[i].Type == podType { + if pod.Status.Conditions[i].Status == kcore.ConditionTrue { + conditionState = pointer.Bool(true) + } + if pod.Status.Conditions[i].Status == kcore.ConditionFalse { + conditionState = pointer.Bool(false) + } + condition = &pod.Status.Conditions[i] + break + } + } + return conditionState, condition +} + func (c *Client) CreatePod(pod *kcore.Pod) (*kcore.Pod, error) { pod.TypeMeta = _podTypeMeta pod, err := c.podClient.Create(context.Background(), pod, kmeta.CreateOptions{}) @@ -120,14 +154,26 @@ func (c *Client) ApplyPod(pod *kcore.Pod) (*kcore.Pod, error) { } func IsPodReady(pod *kcore.Pod) bool { - if GetPodStatus(pod) != PodStatusRunning { + if GetPodStatus(pod) != PodStatusReady { return false } - for _, condition := range pod.Status.Conditions { - if condition.Type == "Ready" && condition.Status == kcore.ConditionTrue { - return true - } + podConditionState, _ := GetPodConditionOf(pod, kcore.PodReady) + if podConditionState != nil && *podConditionState { + return true + } + + return false +} + +func IsPodStalled(pod *kcore.Pod) bool { + if GetPodStatus(pod) != PodStatusPending { + return false + } + + podConditionState, podCondition := GetPodConditionOf(pod, kcore.PodScheduled) + if podConditionState != nil && !*podConditionState && !podCondition.LastTransitionTime.Time.IsZero() && time.Since(podCondition.LastTransitionTime.Time) >= _waitForCreatingPodTimeout { + return true } return false @@ -137,7 +183,7 @@ func GetPodReadyTime(pod *kcore.Pod) *time.Time { for i := range pod.Status.Conditions { condition := pod.Status.Conditions[i] - if condition.Type == "Ready" && condition.Status == kcore.ConditionTrue { + if condition.Type == kcore.PodReady && condition.Status == kcore.ConditionTrue { if condition.LastTransitionTime.Time.IsZero() { return nil } @@ -148,8 +194,6 @@ func GetPodReadyTime(pod *kcore.Pod) *time.Time { return nil } -var _evictedMemoryMessageRegex = regexp.MustCompile(`(?i)low\W+on\W+resource\W+memory`) - func WasPodOOMKilled(pod *kcore.Pod) bool { if pod.Status.Reason == ReasonEvicted && _evictedMemoryMessageRegex.MatchString(pod.Status.Message) { return true @@ -176,15 +220,11 @@ func GetPodStatus(pod *kcore.Pod) PodStatus { switch pod.Status.Phase { case kcore.PodPending: - initPodStatus := PodStatusFromContainerStatuses(pod.Status.InitContainerStatuses) - if initPodStatus == PodStatusRunning { - return PodStatusInitializing + podConditionState, podCondition := GetPodConditionOf(pod, kcore.PodScheduled) + if podConditionState != nil && !*podConditionState && !podCondition.LastTransitionTime.Time.IsZero() && time.Since(podCondition.LastTransitionTime.Time) >= _waitForCreatingPodTimeout { + return PodStatusStalled } - allPodStatus := PodStatusFromContainerStatuses(append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)) - if allPodStatus == PodStatusErrImagePull { - return PodStatusErrImagePull - } - return PodStatusPending + return PodStatusFromContainerStatuses(append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)) case kcore.PodSucceeded: return PodStatusSucceeded case kcore.PodFailed: @@ -215,7 +255,17 @@ func GetPodStatus(pod *kcore.Pod) PodStatus { return PodStatusTerminating } - return PodStatusFromContainerStatuses(pod.Status.ContainerStatuses) + podConditionState, _ := GetPodConditionOf(pod, kcore.PodReady) + if podConditionState != nil && *podConditionState { + return PodStatusReady + } + + status := PodStatusFromContainerStatuses(pod.Status.ContainerStatuses) + if status == PodStatusReady { + return PodStatusNotReady + } + + return status default: return PodStatusUnknown } @@ -224,7 +274,9 @@ func GetPodStatus(pod *kcore.Pod) PodStatus { func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) PodStatus { numContainers := len(containerStatuses) numWaiting := 0 - numRunning := 0 + numCreating := 0 + numNotReady := 0 + numReady := 0 numSucceeded := 0 numFailed := 0 numKilled := 0 @@ -235,9 +287,9 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P } for _, containerStatus := range containerStatuses { if containerStatus.State.Running != nil && containerStatus.Ready { - numRunning++ - } else if containerStatus.State.Running != nil && containerStatus.RestartCount == 0 { - numRunning++ + numReady++ + } else if containerStatus.State.Running != nil && !containerStatus.Ready { + numNotReady++ } else if containerStatus.State.Terminated != nil { exitCode := containerStatus.State.Terminated.ExitCode reason := containerStatus.State.Terminated.Reason @@ -264,6 +316,8 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P } } else if containerStatus.State.Waiting != nil && _imagePullErrorStrings.Has(containerStatus.State.Waiting.Reason) { return PodStatusErrImagePull + } else if containerStatus.State.Waiting != nil && _creatingReasons.Has(containerStatus.State.Waiting.Reason) { + numCreating++ } else { // either containerStatus.State.Waiting != nil or all containerStatus.States are nil (which implies waiting) numWaiting++ @@ -279,8 +333,12 @@ func PodStatusFromContainerStatuses(containerStatuses []kcore.ContainerStatus) P return PodStatusPending } else if numSucceeded == numContainers { return PodStatusSucceeded + } else if numCreating > 0 { + return PodStatusCreating + } else if numNotReady > 0 { + return PodStatusNotReady } else { - return PodStatusRunning + return PodStatusReady } } diff --git a/pkg/operator/endpoints/describe.go b/pkg/operator/endpoints/describe.go new file mode 100644 index 0000000000..b574d5eefc --- /dev/null +++ b/pkg/operator/endpoints/describe.go @@ -0,0 +1,36 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "net/http" + + "github.com/cortexlabs/cortex/pkg/operator/resources" + "github.com/gorilla/mux" +) + +func DescribeAPI(w http.ResponseWriter, r *http.Request) { + apiName := mux.Vars(r)["apiName"] + + response, err := resources.DescribeAPI(apiName) + if err != nil { + respondError(w, r, err) + return + } + + respondJSON(w, r, response) +} diff --git a/pkg/operator/endpoints/logs.go b/pkg/operator/endpoints/logs.go index 2d335e27da..d56add3806 100644 --- a/pkg/operator/endpoints/logs.go +++ b/pkg/operator/endpoints/logs.go @@ -19,6 +19,7 @@ package endpoints import ( "net/http" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/resources" "github.com/cortexlabs/cortex/pkg/operator/resources/asyncapi" @@ -98,7 +99,10 @@ func GetLogURL(w http.ResponseWriter, r *http.Request) { respondError(w, r, err) return } - logURL, err := operator.APILogURL(apiResponse[0].Spec) + if apiResponse[0].Spec == nil { + respondError(w, r, errors.ErrorUnexpected("unable to get api spec", apiName)) + } + logURL, err := operator.APILogURL(*apiResponse[0].Spec) if err != nil { respondError(w, r, err) return @@ -112,7 +116,10 @@ func GetLogURL(w http.ResponseWriter, r *http.Request) { respondError(w, r, err) return } - logURL, err := operator.APILogURL(apiResponse[0].Spec) + if apiResponse[0].Spec == nil { + respondError(w, r, errors.ErrorUnexpected("unable to get api spec", apiName)) + } + logURL, err := operator.APILogURL(*apiResponse[0].Spec) if err != nil { respondError(w, r, err) return diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index f9536596ce..43e36168c9 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/pkg/config" "github.com/cortexlabs/cortex/pkg/lib/urls" "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/userconfig" ) // APILoadBalancerURL returns the http endpoint of the ingress load balancer for deployed APIs @@ -63,3 +64,20 @@ func APIEndpoint(api *spec.API) (string, error) { return urls.Join(baseAPIEndpoint, *api.Networking.Endpoint), nil } + +func APIEndpointFromResource(deployedResource *DeployedResource) (string, error) { + apiEndpoint, err := userconfig.EndpointFromAnnotation(deployedResource.VirtualService) + if err != nil { + return "", err + } + + baseAPIEndpoint := "" + + baseAPIEndpoint, err = APILoadBalancerURL() + if err != nil { + return "", err + } + baseAPIEndpoint = strings.Replace(baseAPIEndpoint, "https://", "http://", 1) + + return urls.Join(baseAPIEndpoint, apiEndpoint), nil +} diff --git a/pkg/operator/resources/asyncapi/api.go b/pkg/operator/resources/asyncapi/api.go index 39cce27446..0c6ba6b190 100644 --- a/pkg/operator/resources/asyncapi/api.go +++ b/pkg/operator/resources/asyncapi/api.go @@ -19,6 +19,7 @@ package asyncapi import ( "fmt" "path/filepath" + "sort" "time" "github.com/cortexlabs/cortex/pkg/config" @@ -31,6 +32,7 @@ import ( "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/status" "github.com/cortexlabs/cortex/pkg/types/userconfig" "github.com/cortexlabs/cortex/pkg/workloads" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" @@ -249,13 +251,71 @@ func DeleteAPI(apiName string, keepCache bool) error { return nil } +func GetAllAPIs(deployments []kapps.Deployment) ([]schema.APIResponse, error) { + asyncAPIs := make([]schema.APIResponse, 0) + mappedAsyncAPIs := make(map[string]schema.APIResponse, 0) + apiNames := make([]string, 0) + + for i := range deployments { + if deployments[i].Labels["cortex.dev/async"] != "api" { + continue + } + apiName := deployments[i].Labels["apiName"] + apiNames = append(apiNames, apiName) + + metadata, err := spec.MetadataFromDeployment(&deployments[i]) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) + } + mappedAsyncAPIs[apiName] = schema.APIResponse{ + Status: status.FromDeployment(&deployments[i]), + Metadata: metadata, + } + } + + sort.Strings(apiNames) + for _, apiName := range apiNames { + asyncAPIs = append(asyncAPIs, mappedAsyncAPIs[apiName]) + } + + return asyncAPIs, nil +} + func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - status, err := GetStatus(deployedResource.Name) + var apiDeployment *kapps.Deployment + var gatewayDeployment *kapps.Deployment + + err := parallel.RunFirstErr( + func() error { + var err error + apiDeployment, err = config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) + return err + }, + func() error { + var err error + gatewayDeployment, err = config.K8s.GetDeployment(getGatewayK8sName(deployedResource.Name)) + return err + }, + ) if err != nil { return nil, err } - api, err := operator.DownloadAPISpec(status.APIName, status.APIID) + if apiDeployment == nil { + return nil, errors.ErrorUnexpected("unable to find api deployment", deployedResource.Name) + } + + if gatewayDeployment == nil { + return nil, errors.ErrorUnexpected("unable to find gateway deployment", deployedResource.Name) + } + + apiStatus := status.FromDeployment(apiDeployment) + apiMetadata, err := spec.MetadataFromDeployment(apiDeployment) + if err != nil { + return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) + } + + api, err := operator.DownloadAPISpec(apiMetadata.Name, apiMetadata.APIID) if err != nil { return nil, err } @@ -269,43 +329,73 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp return []schema.APIResponse{ { - Spec: *api, - Status: status, - Endpoint: apiEndpoint, + Spec: api, + Metadata: apiMetadata, + Status: apiStatus, + Endpoint: &apiEndpoint, DashboardURL: dashboardURL, }, }, nil } -func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIResponse, error) { - statuses, err := GetAllStatuses(deployments, pods) +func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { + var apiDeployment *kapps.Deployment + var gatewayDeployment *kapps.Deployment + + err := parallel.RunFirstErr( + func() error { + var err error + apiDeployment, err = config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) + return err + }, + func() error { + var err error + gatewayDeployment, err = config.K8s.GetDeployment(getGatewayK8sName(deployedResource.Name)) + return err + }, + ) if err != nil { return nil, err } - apiNames, apiIDs := namesAndIDsFromStatuses(statuses) - apis, err := operator.DownloadAPISpecs(apiNames, apiIDs) - if err != nil { - return nil, err + if apiDeployment == nil { + return nil, errors.ErrorUnexpected("unable to find api deployment", deployedResource.Name) } - asyncAPIs := make([]schema.APIResponse, len(apis)) + if gatewayDeployment == nil { + return nil, errors.ErrorUnexpected("unable to find gateway deployment", deployedResource.Name) + } - for i := range apis { - api := apis[i] - endpoint, err := operator.APIEndpoint(&api) - if err != nil { - return nil, err - } + apiStatus := status.FromDeployment(apiDeployment) + apiMetadata, err := spec.MetadataFromDeployment(apiDeployment) + if err != nil { + return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) + } - asyncAPIs[i] = schema.APIResponse{ - Spec: api, - Status: &statuses[i], - Endpoint: endpoint, - } + apiPods, err := config.K8s.ListPodsByLabels(map[string]string{ + "apiName": apiDeployment.Labels["apiName"], + "cortex.dev/async": "api", + }) + if err != nil { + return nil, err } + apiStatus.ReplicaCounts = GetReplicaCounts(apiDeployment, apiPods) - return asyncAPIs, nil + apiEndpoint, err := operator.APIEndpointFromResource(deployedResource) + if err != nil { + return nil, err + } + + dashboardURL := pointer.String(getDashboardURL(deployedResource.Name)) + + return []schema.APIResponse{ + { + Metadata: apiMetadata, + Status: apiStatus, + Endpoint: &apiEndpoint, + DashboardURL: dashboardURL, + }, + }, nil } func UpdateAPIMetricsCron(apiDeployment *kapps.Deployment) error { @@ -545,6 +635,33 @@ func deleteK8sResources(apiName string) error { return err } +// returns true if min_replicas are not ready and no updated replicas have errored +func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { + pods, err := config.K8s.ListPodsByLabel("apiName", deployment.Labels["apiName"]) + if err != nil { + return false, err + } + + replicaCounts := GetReplicaCounts(deployment, pods) + + autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) + if err != nil { + return false, err + } + + if replicaCounts.Ready < autoscalingSpec.MinReplicas && replicaCounts.TotalFailed() == 0 { + return true, nil + } + + return false, nil +} + +func isPodSpecLatest(deployment *kapps.Deployment, pod *kcore.Pod) bool { + // Note: the gateway deployment/pods don't have "podID" or "deploymentID" labels, which is ok since it is always up-to-date + return deployment.Spec.Template.Labels["podID"] == pod.Labels["podID"] && + deployment.Spec.Template.Labels["deploymentID"] == pod.Labels["deploymentID"] +} + func getDashboardURL(apiName string) string { loadBalancerURL, err := operator.LoadBalancerURL() if err != nil { diff --git a/pkg/operator/resources/asyncapi/status.go b/pkg/operator/resources/asyncapi/status.go index 38e02329d0..7035c31c01 100644 --- a/pkg/operator/resources/asyncapi/status.go +++ b/pkg/operator/resources/asyncapi/status.go @@ -17,232 +17,13 @@ limitations under the License. package asyncapi import ( - "sort" - "time" - - "github.com/cortexlabs/cortex/pkg/config" - "github.com/cortexlabs/cortex/pkg/consts" - "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" - "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/types/status" - "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/cortex/pkg/workloads" kapps "k8s.io/api/apps/v1" kcore "k8s.io/api/core/v1" ) -type asyncResourceGroup struct { - APIDeployment *kapps.Deployment - APIPods []kcore.Pod - GatewayDeployment *kapps.Deployment - GatewayPods []kcore.Pod -} - -func GetStatus(apiName string) (*status.Status, error) { - var apiDeployment *kapps.Deployment - var gatewayDeployment *kapps.Deployment - var gatewayPods []kcore.Pod - var apiPods []kcore.Pod - - err := parallel.RunFirstErr( - func() error { - var err error - apiDeployment, err = config.K8s.GetDeployment(workloads.K8sName(apiName)) - return err - }, - func() error { - var err error - gatewayDeployment, err = config.K8s.GetDeployment(getGatewayK8sName(apiName)) - return err - }, - func() error { - var err error - gatewayPods, err = config.K8s.ListPodsByLabels( - map[string]string{ - "apiName": apiName, - "cortex.dev/async": "gateway", - }, - ) - return err - }, - func() error { - var err error - apiPods, err = config.K8s.ListPodsByLabels( - map[string]string{ - "apiName": apiName, - "cortex.dev/async": "api", - }, - ) - return err - }, - ) - if err != nil { - return nil, err - } - - if apiDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find api deployment", apiName) - } - - if gatewayDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find gateway deployment", apiName) - } - - return apiStatus(apiDeployment, apiPods, gatewayDeployment, gatewayPods) -} - -func GetAllStatuses(deployments []kapps.Deployment, pods []kcore.Pod) ([]status.Status, error) { - resourcesByAPI := groupResourcesByAPI(deployments, pods) - statuses := make([]status.Status, len(resourcesByAPI)) - - var i int - for apiName, k8sResources := range resourcesByAPI { - if k8sResources.APIDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find api deployment", apiName) - } - - if k8sResources.GatewayDeployment == nil { - return nil, errors.ErrorUnexpected("unable to find gateway deployment", apiName) - } - - st, err := apiStatus(k8sResources.APIDeployment, k8sResources.APIPods, k8sResources.GatewayDeployment, k8sResources.GatewayPods) - if err != nil { - return nil, err - } - statuses[i] = *st - i++ - } - - sort.Slice(statuses, func(i, j int) bool { - return statuses[i].APIName < statuses[j].APIName - }) - - return statuses, nil -} - -func namesAndIDsFromStatuses(statuses []status.Status) ([]string, []string) { - apiNames := make([]string, len(statuses)) - apiIDs := make([]string, len(statuses)) - - for i, st := range statuses { - apiNames[i] = st.APIName - apiIDs[i] = st.APIID - } - - return apiNames, apiIDs -} - -// let's do CRDs instead, to avoid this -func groupResourcesByAPI(deployments []kapps.Deployment, pods []kcore.Pod) map[string]*asyncResourceGroup { - resourcesByAPI := map[string]*asyncResourceGroup{} - for i := range deployments { - deployment := deployments[i] - apiName := deployment.Labels["apiName"] - asyncType := deployment.Labels["cortex.dev/async"] - apiResources, exists := resourcesByAPI[apiName] - if exists { - if asyncType == "api" { - apiResources.APIDeployment = &deployment - } else { - apiResources.GatewayDeployment = &deployment - } - } else { - if asyncType == "api" { - resourcesByAPI[apiName] = &asyncResourceGroup{APIDeployment: &deployment} - } else { - resourcesByAPI[apiName] = &asyncResourceGroup{GatewayDeployment: &deployment} - } - } - } - - for _, pod := range pods { - apiName := pod.Labels["apiName"] - asyncType := pod.Labels["cortex.dev/async"] - apiResources, exists := resourcesByAPI[apiName] - if !exists { - // ignore pods that might still be waiting to be deleted while the deployment has already been deleted - continue - } - - if asyncType == "api" { - apiResources.APIPods = append(resourcesByAPI[apiName].APIPods, pod) - } else { - apiResources.GatewayPods = append(resourcesByAPI[apiName].GatewayPods, pod) - } - } - return resourcesByAPI -} - -func apiStatus(apiDeployment *kapps.Deployment, apiPods []kcore.Pod, gatewayDeployment *kapps.Deployment, gatewayPods []kcore.Pod) (*status.Status, error) { - autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(apiDeployment) - if err != nil { - return nil, err - } - - apiReplicaCounts := getReplicaCounts(apiDeployment, apiPods) - gatewayReplicaCounts := getReplicaCounts(gatewayDeployment, gatewayPods) - - st := &status.Status{} - st.APIName = apiDeployment.Labels["apiName"] - st.APIID = apiDeployment.Labels["apiID"] - st.ReplicaCounts = apiReplicaCounts - st.Code = getStatusCode(apiReplicaCounts, gatewayReplicaCounts, autoscalingSpec.MinReplicas) - - return st, nil -} - -func getStatusCode(apiCounts status.ReplicaCounts, gatewayCounts status.ReplicaCounts, apiMinReplicas int32) status.Code { - if apiCounts.Updated.Ready >= apiCounts.Requested && gatewayCounts.Updated.Ready >= 1 { - return status.Live - } - - if apiCounts.Updated.ErrImagePull > 0 || gatewayCounts.Updated.ErrImagePull > 0 { - return status.ErrorImagePull - } - - if apiCounts.Updated.Failed > 0 || apiCounts.Updated.Killed > 0 || - gatewayCounts.Updated.Failed > 0 || gatewayCounts.Updated.Killed > 0 { - return status.Error - } - - if apiCounts.Updated.KilledOOM > 0 || gatewayCounts.Updated.KilledOOM > 0 { - return status.OOM - } - - if apiCounts.Updated.Stalled > 0 || gatewayCounts.Updated.Stalled > 0 { - return status.Stalled - } - - if apiCounts.Updated.Ready >= apiMinReplicas && gatewayCounts.Updated.Ready >= 1 { - return status.Live - } - - return status.Updating -} - -// returns true if min_replicas are not ready and no updated replicas have errored -func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { - pods, err := config.K8s.ListPodsByLabel("apiName", deployment.Labels["apiName"]) - if err != nil { - return false, err - } - - replicaCounts := getReplicaCounts(deployment, pods) - - autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) - if err != nil { - return false, err - } - - if replicaCounts.Updated.Ready < autoscalingSpec.MinReplicas && replicaCounts.Updated.TotalFailed() == 0 { - return true, nil - } - - return false, nil -} - -func getReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) status.ReplicaCounts { +func GetReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) *status.ReplicaCounts { counts := status.ReplicaCounts{} counts.Requested = *deployment.Spec.Replicas @@ -255,50 +36,55 @@ func getReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) status.Rep addPodToReplicaCounts(&pod, deployment, &counts) } - return counts + return &counts } func addPodToReplicaCounts(pod *kcore.Pod, deployment *kapps.Deployment, counts *status.ReplicaCounts) { - var subCounts *status.SubReplicaCounts + latest := false if isPodSpecLatest(deployment, pod) { - subCounts = &counts.Updated - } else { - subCounts = &counts.Stale + latest = true } - if k8s.IsPodReady(pod) { - subCounts.Ready++ + isPodReady := k8s.IsPodReady(pod) + if latest && isPodReady { + counts.Ready++ + return + } else if !latest && isPodReady { + counts.ReadyOutOfDate++ return } - switch k8s.GetPodStatus(pod) { + podStatus := k8s.GetPodStatus(pod) + + if podStatus == k8s.PodStatusTerminating { + counts.Terminating++ + return + } + + if !latest { + return + } + + switch podStatus { case k8s.PodStatusPending: - if time.Since(pod.CreationTimestamp.Time) > consts.WaitForInitializingReplicasTimeout { - subCounts.Stalled++ - } else { - subCounts.Pending++ - } - case k8s.PodStatusInitializing: - subCounts.Initializing++ - case k8s.PodStatusRunning: - subCounts.Initializing++ + counts.Pending++ + case k8s.PodStatusStalled: + counts.Stalled++ + case k8s.PodStatusCreating: + counts.Creating++ + case k8s.PodStatusReady: + counts.Ready++ + case k8s.PodStatusNotReady: + counts.NotReady++ case k8s.PodStatusErrImagePull: - subCounts.ErrImagePull++ - case k8s.PodStatusTerminating: - subCounts.Terminating++ + counts.ErrImagePull++ case k8s.PodStatusFailed: - subCounts.Failed++ + counts.Failed++ case k8s.PodStatusKilled: - subCounts.Killed++ + counts.Killed++ case k8s.PodStatusKilledOOM: - subCounts.KilledOOM++ - default: - subCounts.Unknown++ + counts.KilledOOM++ + case k8s.PodStatusUnknown: + counts.Unknown++ } } - -func isPodSpecLatest(deployment *kapps.Deployment, pod *kcore.Pod) bool { - // Note: the gateway deployment/pods don't have "podID" or "deploymentID" labels, which is ok since it is always up-to-date - return deployment.Spec.Template.Labels["podID"] == pod.Labels["podID"] && - deployment.Spec.Template.Labels["deploymentID"] == pod.Labels["deploymentID"] -} diff --git a/pkg/operator/resources/job/batchapi/api.go b/pkg/operator/resources/job/batchapi/api.go index b85726a531..6ac1c87219 100644 --- a/pkg/operator/resources/job/batchapi/api.go +++ b/pkg/operator/resources/job/batchapi/api.go @@ -140,25 +140,18 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, batchJob apiNameToBatchJobsMap[batchJob.Spec.APIName] = append(apiNameToBatchJobsMap[batchJob.Spec.APIName], &batchJobList[i]) } - for _, virtualService := range virtualServices { - apiName := virtualService.Labels["apiName"] - apiID := virtualService.Labels["apiID"] - - api, err := operator.DownloadAPISpec(apiName, apiID) + for i := range virtualServices { + apiName := virtualServices[i].Labels["apiName"] + metadata, err := spec.MetadataFromVirtualService(&virtualServices[i]) if err != nil { - return nil, err - } - - endpoint, err := operator.APIEndpoint(api) - if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) } var jobStatuses []status.BatchJobStatus - batchJobs := apiNameToBatchJobsMap[apiName] + batchJobs := apiNameToBatchJobsMap[metadata.Name] if len(batchJobs) == 0 { - jobStates, err := job.GetMostRecentlySubmittedJobStates(apiName, 1, userconfig.BatchAPIKind) + jobStates, err := job.GetMostRecentlySubmittedJobStates(metadata.Name, 1, userconfig.BatchAPIKind) if err != nil { return nil, err } @@ -183,9 +176,8 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, batchJob } } - batchAPIsMap[apiName] = &schema.APIResponse{ - Spec: *api, - Endpoint: endpoint, + batchAPIsMap[metadata.Name] = &schema.APIResponse{ + Metadata: metadata, BatchJobStatuses: jobStatuses, } } @@ -200,10 +192,12 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, batchJob } func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - virtualService := deployedResource.VirtualService + metadata, err := spec.MetadataFromVirtualService(deployedResource.VirtualService) + if err != nil { + return nil, err + } - apiID := virtualService.Labels["apiID"] - api, err := operator.DownloadAPISpec(deployedResource.Name, apiID) + api, err := operator.DownloadAPISpec(deployedResource.Name, metadata.APIID) if err != nil { return nil, err } @@ -263,9 +257,10 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp return []schema.APIResponse{ { - Spec: *api, + Spec: api, + Metadata: metadata, BatchJobStatuses: jobStatuses, - Endpoint: endpoint, + Endpoint: &endpoint, DashboardURL: dashboardURL, }, }, nil diff --git a/pkg/operator/resources/job/taskapi/api.go b/pkg/operator/resources/job/taskapi/api.go index 9261cc16a9..c5ca6e17fa 100644 --- a/pkg/operator/resources/job/taskapi/api.go +++ b/pkg/operator/resources/job/taskapi/api.go @@ -146,21 +146,15 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs } } - for _, virtualService := range virtualServices { - apiName := virtualService.Labels["apiName"] - apiID := virtualService.Labels["apiID"] + for i := range virtualServices { + apiName := virtualServices[i].Labels["apiName"] - api, err := operator.DownloadAPISpec(apiName, apiID) + metadata, err := spec.MetadataFromVirtualService(&virtualServices[i]) if err != nil { - return nil, err - } - - endpoint, err := operator.APIEndpoint(api) - if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) } - jobStates, err := job.GetMostRecentlySubmittedJobStates(apiName, 1, userconfig.TaskAPIKind) + jobStates, err := job.GetMostRecentlySubmittedJobStates(metadata.Name, 1, userconfig.TaskAPIKind) jobStatuses := []status.TaskJobStatus{} if len(jobStates) > 0 { @@ -172,9 +166,8 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs jobStatuses = append(jobStatuses, *jobStatus) } - taskAPIsMap[apiName] = &schema.APIResponse{ - Spec: *api, - Endpoint: endpoint, + taskAPIsMap[metadata.Name] = &schema.APIResponse{ + Metadata: metadata, TaskJobStatuses: jobStatuses, } } @@ -209,8 +202,8 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs taskAPIList := make([]schema.APIResponse, 0, len(taskAPIsMap)) - for _, batchAPI := range taskAPIsMap { - taskAPIList = append(taskAPIList, *batchAPI) + for _, taskAPI := range taskAPIsMap { + taskAPIList = append(taskAPIList, *taskAPI) } return taskAPIList, nil @@ -218,10 +211,12 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs // GetAPIByName returns a single task API and its most recently submitted job along with all running task jobs func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - virtualService := deployedResource.VirtualService + metadata, err := spec.MetadataFromVirtualService(deployedResource.VirtualService) + if err != nil { + return nil, err + } - apiID := virtualService.Labels["apiID"] - api, err := operator.DownloadAPISpec(deployedResource.Name, apiID) + api, err := operator.DownloadAPISpec(deployedResource.Name, metadata.APIID) if err != nil { return nil, err } @@ -295,9 +290,10 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp return []schema.APIResponse{ { - Spec: *api, + Spec: api, + Metadata: metadata, TaskJobStatuses: jobStatuses, - Endpoint: endpoint, + Endpoint: &endpoint, DashboardURL: dashboardURL, }, }, nil diff --git a/pkg/operator/resources/job/worker_stats.go b/pkg/operator/resources/job/worker_stats.go index 07628995e4..797d65980e 100644 --- a/pkg/operator/resources/job/worker_stats.go +++ b/pkg/operator/resources/job/worker_stats.go @@ -17,9 +17,6 @@ limitations under the License. package job import ( - "time" - - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/types/status" kbatch "k8s.io/api/batch/v1" @@ -43,34 +40,32 @@ func GetWorkerCountsForJob(k8sJob kbatch.Job, pods []kcore.Pod) status.WorkerCou func addPodToWorkerCounts(pod *kcore.Pod, workerCounts *status.WorkerCounts) { if k8s.IsPodReady(pod) { - workerCounts.Running++ + workerCounts.Ready++ return } switch k8s.GetPodStatus(pod) { case k8s.PodStatusPending: - if time.Since(pod.CreationTimestamp.Time) > consts.WaitForInitializingReplicasTimeout { - workerCounts.Stalled++ - } else { - workerCounts.Pending++ - } - case k8s.PodStatusInitializing: - workerCounts.Initializing++ - case k8s.PodStatusRunning: - workerCounts.Initializing++ + workerCounts.Pending++ + case k8s.PodStatusStalled: + workerCounts.Stalled++ + case k8s.PodStatusCreating: + workerCounts.Creating++ + case k8s.PodStatusNotReady: + workerCounts.NotReady++ case k8s.PodStatusErrImagePull: - workerCounts.Failed++ + workerCounts.ErrImagePull++ case k8s.PodStatusTerminating: - workerCounts.Failed++ + workerCounts.Terminating++ case k8s.PodStatusFailed: workerCounts.Failed++ case k8s.PodStatusKilled: - workerCounts.Failed++ + workerCounts.Killed++ case k8s.PodStatusKilledOOM: - workerCounts.Failed++ + workerCounts.KilledOOM++ case k8s.PodStatusSucceeded: workerCounts.Succeeded++ - default: + case k8s.PodStatusUnknown: workerCounts.Unknown++ } } diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 256b253f8e..885c661366 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -19,6 +19,7 @@ package realtimeapi import ( "fmt" "path/filepath" + "sort" "time" "github.com/cortexlabs/cortex/pkg/config" @@ -175,72 +176,106 @@ func DeleteAPI(apiName string, keepCache bool) error { return nil } -func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIResponse, error) { - statuses, err := GetAllStatuses(deployments, pods) - if err != nil { - return nil, err - } +func GetAllAPIs(deployments []kapps.Deployment) ([]schema.APIResponse, error) { + realtimeAPIs := make([]schema.APIResponse, len(deployments)) + mappedRealtimeAPIs := make(map[string]schema.APIResponse, len(deployments)) + apiNames := make([]string, len(deployments)) - apiNames, apiIDs := namesAndIDsFromStatuses(statuses) - apis, err := operator.DownloadAPISpecs(apiNames, apiIDs) - if err != nil { - return nil, err - } + for i := range deployments { + apiName := deployments[i].Labels["apiName"] + apiNames[i] = apiName - realtimeAPIs := make([]schema.APIResponse, len(apis)) - - for i := range apis { - api := apis[i] - endpoint, err := operator.APIEndpoint(&api) + metadata, err := spec.MetadataFromDeployment(&deployments[i]) if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) } - - realtimeAPIs[i] = schema.APIResponse{ - Spec: api, - Status: &statuses[i], - Endpoint: endpoint, + mappedRealtimeAPIs[apiName] = schema.APIResponse{ + Status: status.FromDeployment(&deployments[i]), + Metadata: metadata, } } + sort.Strings(apiNames) + for i := range apiNames { + realtimeAPIs[i] = mappedRealtimeAPIs[apiNames[i]] + } + return realtimeAPIs, nil } -func namesAndIDsFromStatuses(statuses []status.Status) ([]string, []string) { - apiNames := make([]string, len(statuses)) - apiIDs := make([]string, len(statuses)) +func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { + deployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) + if err != nil { + return nil, err + } + + if deployment == nil { + return nil, errors.ErrorUnexpected("unable to find deployment", deployedResource.Name) + } + + apiStatus := status.FromDeployment(deployment) + apiMetadata, err := spec.MetadataFromDeployment(deployment) + if err != nil { + return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) + } + + api, err := operator.DownloadAPISpec(apiMetadata.Name, apiMetadata.APIID) + if err != nil { + return nil, err + } - for i, st := range statuses { - apiNames[i] = st.APIName - apiIDs[i] = st.APIID + apiEndpoint, err := operator.APIEndpoint(api) + if err != nil { + return nil, err } - return apiNames, apiIDs + dashboardURL := pointer.String(getDashboardURL(api.Name)) + + return []schema.APIResponse{ + { + Spec: api, + Metadata: apiMetadata, + Status: apiStatus, + Endpoint: &apiEndpoint, + DashboardURL: dashboardURL, + }, + }, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - st, err := GetStatus(deployedResource.Name) +func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { + deployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) if err != nil { return nil, err } - api, err := operator.DownloadAPISpec(st.APIName, st.APIID) + if deployment == nil { + return nil, errors.ErrorUnexpected("unable to find deployment", deployedResource.Name) + } + + apiStatus := status.FromDeployment(deployment) + apiMetadata, err := spec.MetadataFromDeployment(deployment) + if err != nil { + return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) + } + + pods, err := config.K8s.ListPodsByLabel("apiName", deployment.Labels["apiName"]) if err != nil { return nil, err } + apiStatus.ReplicaCounts = GetReplicaCounts(deployment, pods) - apiEndpoint, err := operator.APIEndpoint(api) + apiEndpoint, err := operator.APIEndpointFromResource(deployedResource) if err != nil { return nil, err } - dashboardURL := pointer.String(getDashboardURL(api.Name)) + dashboardURL := pointer.String(getDashboardURL(deployedResource.Name)) return []schema.APIResponse{ { - Spec: *api, - Status: st, - Endpoint: apiEndpoint, + Metadata: apiMetadata, + Status: apiStatus, + Endpoint: &apiEndpoint, DashboardURL: dashboardURL, }, }, nil @@ -364,14 +399,14 @@ func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { return false, err } - replicaCounts := getReplicaCounts(deployment, pods) + replicaCounts := GetReplicaCounts(deployment, pods) autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) if err != nil { return false, err } - if replicaCounts.Updated.Ready < autoscalingSpec.MinReplicas && replicaCounts.Updated.TotalFailed() == 0 { + if replicaCounts.Ready < autoscalingSpec.MinReplicas && replicaCounts.TotalFailed() == 0 { return true, nil } diff --git a/pkg/operator/resources/realtimeapi/status.go b/pkg/operator/resources/realtimeapi/status.go index a65716f35c..a90c42f387 100644 --- a/pkg/operator/resources/realtimeapi/status.go +++ b/pkg/operator/resources/realtimeapi/status.go @@ -17,81 +17,13 @@ limitations under the License. package realtimeapi import ( - "sort" - "time" - - "github.com/cortexlabs/cortex/pkg/config" - "github.com/cortexlabs/cortex/pkg/consts" - "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" - "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/types/status" - "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/cortex/pkg/workloads" kapps "k8s.io/api/apps/v1" kcore "k8s.io/api/core/v1" ) -func GetStatus(apiName string) (*status.Status, error) { - var deployment *kapps.Deployment - var pods []kcore.Pod - - err := parallel.RunFirstErr( - func() error { - var err error - deployment, err = config.K8s.GetDeployment(workloads.K8sName(apiName)) - return err - }, - func() error { - var err error - pods, err = config.K8s.ListPodsByLabel("apiName", apiName) - return err - }, - ) - if err != nil { - return nil, err - } - - if deployment == nil { - return nil, errors.ErrorUnexpected("unable to find deployment", apiName) - } - - return apiStatus(deployment, pods) -} - -func GetAllStatuses(deployments []kapps.Deployment, pods []kcore.Pod) ([]status.Status, error) { - statuses := make([]status.Status, len(deployments)) - for i := range deployments { - st, err := apiStatus(&deployments[i], pods) - if err != nil { - return nil, err - } - statuses[i] = *st - } - - sort.Slice(statuses, func(i, j int) bool { - return statuses[i].APIName < statuses[j].APIName - }) - - return statuses, nil -} - -func apiStatus(deployment *kapps.Deployment, allPods []kcore.Pod) (*status.Status, error) { - autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) - if err != nil { - return nil, err - } - - status := &status.Status{} - status.APIName = deployment.Labels["apiName"] - status.APIID = deployment.Labels["apiID"] - status.ReplicaCounts = getReplicaCounts(deployment, allPods) - status.Code = getStatusCode(&status.ReplicaCounts, autoscalingSpec.MinReplicas) - - return status, nil -} - -func getReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) status.ReplicaCounts { +func GetReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) *status.ReplicaCounts { counts := status.ReplicaCounts{} counts.Requested = *deployment.Spec.Replicas @@ -103,72 +35,55 @@ func getReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) status.Rep addPodToReplicaCounts(&pods[i], deployment, &counts) } - return counts + return &counts } func addPodToReplicaCounts(pod *kcore.Pod, deployment *kapps.Deployment, counts *status.ReplicaCounts) { - var subCounts *status.SubReplicaCounts + latest := false if isPodSpecLatest(deployment, pod) { - subCounts = &counts.Updated - } else { - subCounts = &counts.Stale + latest = true } - if k8s.IsPodReady(pod) { - subCounts.Ready++ + isPodReady := k8s.IsPodReady(pod) + if latest && isPodReady { + counts.Ready++ + return + } else if !latest && isPodReady { + counts.ReadyOutOfDate++ return } - switch k8s.GetPodStatus(pod) { - case k8s.PodStatusPending: - if time.Since(pod.CreationTimestamp.Time) > consts.WaitForInitializingReplicasTimeout { - subCounts.Stalled++ - } else { - subCounts.Pending++ - } - case k8s.PodStatusInitializing: - subCounts.Initializing++ - case k8s.PodStatusRunning: - subCounts.Initializing++ - case k8s.PodStatusErrImagePull: - subCounts.ErrImagePull++ - case k8s.PodStatusTerminating: - subCounts.Terminating++ - case k8s.PodStatusFailed: - subCounts.Failed++ - case k8s.PodStatusKilled: - subCounts.Killed++ - case k8s.PodStatusKilledOOM: - subCounts.KilledOOM++ - default: - subCounts.Unknown++ - } -} - -func getStatusCode(counts *status.ReplicaCounts, minReplicas int32) status.Code { - if counts.Updated.Ready >= counts.Requested { - return status.Live - } - - if counts.Updated.ErrImagePull > 0 { - return status.ErrorImagePull - } - - if counts.Updated.Failed > 0 || counts.Updated.Killed > 0 { - return status.Error - } + podStatus := k8s.GetPodStatus(pod) - if counts.Updated.KilledOOM > 0 { - return status.OOM + if podStatus == k8s.PodStatusTerminating { + counts.Terminating++ + return } - if counts.Updated.Stalled > 0 { - return status.Stalled + if !latest { + return } - if counts.Updated.Ready >= minReplicas { - return status.Live + switch podStatus { + case k8s.PodStatusPending: + counts.Pending++ + case k8s.PodStatusStalled: + counts.Stalled++ + case k8s.PodStatusCreating: + counts.Creating++ + case k8s.PodStatusReady: + counts.Ready++ + case k8s.PodStatusNotReady: + counts.NotReady++ + case k8s.PodStatusErrImagePull: + counts.ErrImagePull++ + case k8s.PodStatusFailed: + counts.Failed++ + case k8s.PodStatusKilled: + counts.Killed++ + case k8s.PodStatusKilledOOM: + counts.KilledOOM++ + case k8s.PodStatusUnknown: + counts.Unknown++ } - - return status.Updating } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 87069c2136..445571ad25 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -158,8 +158,8 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*schema.APIResponse, stri apiEndpoint, _ := operator.APIEndpoint(api) return &schema.APIResponse{ - Spec: *api, - Endpoint: apiEndpoint, + Spec: api, + Endpoint: &apiEndpoint, }, msg, nil } @@ -256,7 +256,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { func GetAPIs() ([]schema.APIResponse, error) { var deployments []kapps.Deployment var k8sTaskJobs []kbatch.Job - var pods []kcore.Pod + var taskAPIPods []kcore.Pod var virtualServices []istioclientnetworking.VirtualService var batchJobList batch.BatchJobList @@ -268,7 +268,7 @@ func GetAPIs() ([]schema.APIResponse, error) { }, func() error { var err error - pods, err = config.K8s.ListPodsWithLabelKeys("apiName") + taskAPIPods, err = config.K8s.ListPodsByLabel("apiKind", userconfig.TaskAPIKind.String()) return err }, func() error { @@ -308,23 +308,6 @@ func GetAPIs() ([]schema.APIResponse, error) { } } - var realtimeAPIPods []kcore.Pod - var batchAPIPods []kcore.Pod - var taskAPIPods []kcore.Pod - var asyncAPIPods []kcore.Pod - for _, pod := range pods { - switch pod.Labels["apiKind"] { - case userconfig.RealtimeAPIKind.String(): - realtimeAPIPods = append(realtimeAPIPods, pod) - case userconfig.BatchAPIKind.String(): - batchAPIPods = append(batchAPIPods, pod) - case userconfig.TaskAPIKind.String(): - taskAPIPods = append(taskAPIPods, pod) - case userconfig.AsyncAPIKind.String(): - asyncAPIPods = append(asyncAPIPods, pod) - } - } - var batchAPIVirtualServices []istioclientnetworking.VirtualService var taskAPIVirtualServices []istioclientnetworking.VirtualService var trafficSplitterVirtualServices []istioclientnetworking.VirtualService @@ -340,7 +323,7 @@ func GetAPIs() ([]schema.APIResponse, error) { } } - realtimeAPIList, err := realtimeapi.GetAllAPIs(realtimeAPIPods, realtimeAPIDeployments) + realtimeAPIList, err := realtimeapi.GetAllAPIs(realtimeAPIDeployments) if err != nil { return nil, err } @@ -356,7 +339,7 @@ func GetAPIs() ([]schema.APIResponse, error) { return nil, err } - asyncAPIList, err := asyncapi.GetAllAPIs(asyncAPIPods, asyncAPIDeployments) + asyncAPIList, err := asyncapi.GetAllAPIs(asyncAPIDeployments) if err != nil { return nil, err } @@ -449,7 +432,7 @@ func GetAPIByID(apiName string, apiID string) ([]schema.APIResponse, error) { return []schema.APIResponse{ { - Spec: *apiSpec, + Spec: apiSpec, }, }, nil } @@ -500,3 +483,33 @@ func checkIfUsedByTrafficSplitter(apiName string) error { } return nil } + +func DescribeAPI(apiName string) ([]schema.APIResponse, error) { + deployedResource, err := GetDeployedResourceByName(apiName) + if err != nil { + return nil, err + } + + var apiResponse []schema.APIResponse + + switch deployedResource.Kind { + case userconfig.RealtimeAPIKind: + apiResponse, err = realtimeapi.DescribeAPIByName(deployedResource) + if err != nil { + return nil, err + } + case userconfig.AsyncAPIKind: + apiResponse, err = asyncapi.DescribeAPIByName(deployedResource) + if err != nil { + return nil, err + } + default: + return nil, ErrorOperationIsOnlySupportedForKind( + *deployedResource, + userconfig.RealtimeAPIKind, + userconfig.AsyncAPIKind, + ) // unexpected + } + + return apiResponse, nil +} diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 9d81a17faa..4881f724e3 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -26,6 +26,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/parallel" + "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/operator/lib/routines" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" @@ -132,34 +133,27 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination // GetAllAPIs returns a list of metadata, in the form of schema.APIResponse, about all the created traffic splitter APIs func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schema.APIResponse, error) { - var ( - apiNames []string - apiIDs []string - trafficSplitters []schema.APIResponse - ) + var trafficSplitters []schema.APIResponse + for i := range virtualServices { + apiName := virtualServices[i].Labels["apiName"] - for _, virtualService := range virtualServices { - if virtualService.Labels["apiKind"] == userconfig.TrafficSplitterKind.String() { - apiNames = append(apiNames, virtualService.Labels["apiName"]) - apiIDs = append(apiIDs, virtualService.Labels["apiID"]) + metadata, err := spec.MetadataFromVirtualService(&virtualServices[i]) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) } - } - apis, err := operator.DownloadAPISpecs(apiNames, apiIDs) - if err != nil { - return nil, err - } + if metadata.Kind != userconfig.TrafficSplitterKind { + continue + } - for i := range apis { - trafficSplitter := apis[i] - endpoint, err := operator.APIEndpoint(&trafficSplitter) + targets, err := userconfig.TrafficSplitterTargetsFromAnnotations(&virtualServices[i]) if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) } trafficSplitters = append(trafficSplitters, schema.APIResponse{ - Spec: trafficSplitter, - Endpoint: endpoint, + Metadata: metadata, + NumTrafficSplitterTargets: pointer.Int32(targets), }) } @@ -168,7 +162,12 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schem // GetAPIByName retrieves the metadata, in the form of schema.APIResponse, of a single traffic splitter API func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - api, err := operator.DownloadAPISpec(deployedResource.Name, deployedResource.VirtualService.Labels["apiID"]) + metadata, err := spec.MetadataFromVirtualService(deployedResource.VirtualService) + if err != nil { + return nil, err + } + + api, err := operator.DownloadAPISpec(deployedResource.Name, metadata.APIID) if err != nil { return nil, err } @@ -180,8 +179,9 @@ func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResp return []schema.APIResponse{ { - Spec: *api, - Endpoint: endpoint, + Spec: api, + Metadata: metadata, + Endpoint: &endpoint, }, }, nil } diff --git a/pkg/operator/schema/schema.go b/pkg/operator/schema/schema.go index eff68701ee..1ee895cace 100644 --- a/pkg/operator/schema/schema.go +++ b/pkg/operator/schema/schema.go @@ -50,19 +50,21 @@ type NodeInfo struct { } type DeployResult struct { - API *APIResponse `json:"api"` - Message string `json:"message"` - Error string `json:"error"` + API *APIResponse `json:"api" yaml:"api"` + Message string `json:"message" yaml:"message"` + Error string `json:"error" yaml:"error"` } type APIResponse struct { - Spec spec.API `json:"spec"` - Status *status.Status `json:"status,omitempty"` - Endpoint string `json:"endpoint"` - DashboardURL *string `json:"dashboard_url,omitempty"` - BatchJobStatuses []status.BatchJobStatus `json:"batch_job_statuses,omitempty"` - TaskJobStatuses []status.TaskJobStatus `json:"task_job_statuses,omitempty"` - APIVersions []APIVersion `json:"api_versions,omitempty"` + Spec *spec.API `json:"spec,omitempty" yaml:"spec,omitempty"` + Metadata *spec.Metadata `json:"metadata,omitempty" yaml:"metadata,omitempty"` + Status *status.Status `json:"status,omitempty" yaml:"status,omitempty"` + NumTrafficSplitterTargets *int32 `json:"num_traffic_splitter_targets,omitempty" yaml:"num_traffic_splitter_targets,omitempty"` + Endpoint *string `json:"endpoint,omitempty" yaml:"endpoint,omitempty"` + DashboardURL *string `json:"dashboard_url,omitempty" yaml:"dashboard_url,omitempty"` + BatchJobStatuses []status.BatchJobStatus `json:"batch_job_statuses,omitempty" yaml:"batch_job_statuses,omitempty"` + TaskJobStatuses []status.TaskJobStatus `json:"task_job_statuses,omitempty" yaml:"task_job_statuses,omitempty"` + APIVersions []APIVersion `json:"api_versions,omitempty" yaml:"api_versions,omitempty"` } type LogResponse struct { @@ -70,16 +72,16 @@ type LogResponse struct { } type BatchJobResponse struct { - APISpec spec.API `json:"api_spec"` - JobStatus status.BatchJobStatus `json:"job_status"` - Metrics *metrics.BatchMetrics `json:"metrics,omitempty"` - Endpoint string `json:"endpoint"` + APISpec spec.API `json:"api_spec" yaml:"api_spec"` + JobStatus status.BatchJobStatus `json:"job_status" yaml:"job_status"` + Metrics *metrics.BatchMetrics `json:"metrics,omitempty" yaml:"metrics,omitempty"` + Endpoint string `json:"endpoint" yaml:"endpoint"` } type TaskJobResponse struct { - APISpec spec.API `json:"api_spec"` - JobStatus status.TaskJobStatus `json:"job_status"` - Endpoint string `json:"endpoint"` + APISpec spec.API `json:"api_spec" yaml:"api_spec"` + JobStatus status.TaskJobStatus `json:"job_status" yaml:"job_status"` + Endpoint string `json:"endpoint" yaml:"endpoint"` } type DeleteResponse struct { @@ -96,8 +98,8 @@ type ErrorResponse struct { } type APIVersion struct { - APIID string `json:"api_id"` - LastUpdated int64 `json:"last_updated"` + APIID string `json:"api_id" yaml:"api_id"` + LastUpdated int64 `json:"last_updated" yaml:"last_updated"` } type VerifyCortexResponse struct{} diff --git a/pkg/types/spec/api.go b/pkg/types/spec/api.go index e181a0ffab..5b0d39210c 100644 --- a/pkg/types/spec/api.go +++ b/pkg/types/spec/api.go @@ -30,20 +30,61 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/hash" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/types/userconfig" + istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" + kapps "k8s.io/api/apps/v1" ) type API struct { *userconfig.API - ID string `json:"id"` - SpecID string `json:"spec_id"` - PodID string `json:"pod_id"` - DeploymentID string `json:"deployment_id"` + ID string `json:"id" yaml:"id"` + SpecID string `json:"spec_id" yaml:"spec_id"` + PodID string `json:"pod_id" yaml:"pod_id"` + DeploymentID string `json:"deployment_id" yaml:"deployment_id"` - Key string `json:"key"` + Key string `json:"key" yaml:"key"` - InitialDeploymentTime int64 `json:"initial_deployment_time"` - LastUpdated int64 `json:"last_updated"` - MetadataRoot string `json:"metadata_root"` + InitialDeploymentTime int64 `json:"initial_deployment_time" yaml:"initial_deployment_time"` + LastUpdated int64 `json:"last_updated" yaml:"last_updated"` + MetadataRoot string `json:"metadata_root" yaml:"metadata_root"` +} + +type Metadata struct { + *userconfig.Resource + APIID string `json:"id" yaml:"id"` + DeploymentID string `json:"deployment_id,omitempty" yaml:"deployment_id,omitempty"` + LastUpdated int64 `json:"last_updated" yaml:"last_updated"` +} + +func MetadataFromDeployment(deployment *kapps.Deployment) (*Metadata, error) { + lastUpdated, err := TimeFromAPIID(deployment.Labels["apiID"]) + if err != nil { + return nil, err + } + return &Metadata{ + Resource: &userconfig.Resource{ + Name: deployment.Labels["apiName"], + Kind: userconfig.KindFromString(deployment.Labels["apiKind"]), + }, + APIID: deployment.Labels["apiID"], + DeploymentID: deployment.Labels["deploymentID"], + LastUpdated: lastUpdated.Unix(), + }, nil +} + +func MetadataFromVirtualService(vs *istioclientnetworking.VirtualService) (*Metadata, error) { + lastUpdated, err := TimeFromAPIID(vs.Labels["apiID"]) + if err != nil { + return nil, err + } + return &Metadata{ + Resource: &userconfig.Resource{ + Name: vs.Labels["apiName"], + Kind: userconfig.KindFromString(vs.Labels["apiKind"]), + }, + APIID: vs.Labels["apiID"], + DeploymentID: vs.Labels["deploymentID"], + LastUpdated: lastUpdated.Unix(), + }, nil } /* diff --git a/pkg/types/spec/job.go b/pkg/types/spec/job.go index 784fb4f199..d6c6cb354d 100644 --- a/pkg/types/spec/job.go +++ b/pkg/types/spec/job.go @@ -32,9 +32,9 @@ const ( ) type JobKey struct { - ID string `json:"job_id"` - APIName string `json:"api_name"` - Kind userconfig.Kind `json:"kind"` + ID string `json:"job_id" yaml:"job_id"` + APIName string `json:"api_name" yaml:"api_name"` + Kind userconfig.Kind `json:"kind" yaml:"kind"` } func (j JobKey) UserString() string { @@ -56,39 +56,39 @@ func (j JobKey) K8sName() string { } type SQSDeadLetterQueue struct { - ARN string `json:"arn"` - MaxReceiveCount int `json:"max_receive_count"` + ARN string `json:"arn" yaml:"arn"` + MaxReceiveCount int `json:"max_receive_count" yaml:"max_receive_count"` } type RuntimeBatchJobConfig struct { - Workers int `json:"workers"` - SQSDeadLetterQueue *SQSDeadLetterQueue `json:"sqs_dead_letter_queue"` - Config map[string]interface{} `json:"config"` - Timeout *int `json:"timeout"` + Workers int `json:"workers" yaml:"workers"` + SQSDeadLetterQueue *SQSDeadLetterQueue `json:"sqs_dead_letter_queue" yaml:"sqs_dead_letter_queue"` + Config map[string]interface{} `json:"config" yaml:"config"` + Timeout *int `json:"timeout" yaml:"timeout"` } type RuntimeTaskJobConfig struct { - Workers int `json:"workers"` - Config map[string]interface{} `json:"config"` - Timeout *int `json:"timeout"` + Workers int `json:"workers" yaml:"workers"` + Config map[string]interface{} `json:"config" yaml:"config"` + Timeout *int `json:"timeout" yaml:"timeout"` } type BatchJob struct { JobKey RuntimeBatchJobConfig - APIID string `json:"api_id"` - SQSUrl string `json:"sqs_url"` - TotalBatchCount int `json:"total_batch_count,omitempty"` - StartTime time.Time `json:"start_time,omitempty"` + APIID string `json:"api_id" yaml:"api_id"` + SQSUrl string `json:"sqs_url" yaml:"sqs_url"` + TotalBatchCount int `json:"total_batch_count,omitempty" yaml:"total_batch_count,omitempty"` + StartTime time.Time `json:"start_time,omitempty" yaml:"start_time,omitempty"` } type TaskJob struct { JobKey RuntimeTaskJobConfig - APIID string `json:"api_id"` - SpecID string `json:"spec_id"` - PodID string `json:"pod_id"` - StartTime time.Time `json:"start_time"` + APIID string `json:"api_id" yaml:"api_id"` + SpecID string `json:"spec_id" yaml:"spec_id"` + PodID string `json:"pod_id" yaml:"pod_id"` + StartTime time.Time `json:"start_time" yaml:"start_time"` } // e.g. /<cluster UID>/jobs/<job_api_kind>/<cortex version>/<api_name> diff --git a/pkg/types/status/code.go b/pkg/types/status/code.go deleted file mode 100644 index 41a8a13d91..0000000000 --- a/pkg/types/status/code.go +++ /dev/null @@ -1,97 +0,0 @@ -/* -Copyright 2021 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package status - -type Code int - -const ( - Unknown Code = iota - Stalled - Error - ErrorImagePull - OOM - Live - Updating -) - -var _codes = []string{ - "status_unknown", - "status_stalled", - "status_error", - "status_error_image_pull", - "status_oom", - "status_live", - "status_updating", -} - -var _ = [1]int{}[int(Updating)-(len(_codes)-1)] // Ensure list length matches - -var _codeMessages = []string{ - "unknown", // Unknown - "compute unavailable", // Stalled - "error", // Error - "error (image pull)", // Live - "error (out of memory)", // OOM - "live", // Live - "updating", // Updating -} - -var _ = [1]int{}[int(Updating)-(len(_codeMessages)-1)] // Ensure list length matches - -func (code Code) String() string { - if int(code) < 0 || int(code) >= len(_codes) { - return _codes[Unknown] - } - return _codes[code] -} - -func (code Code) Message() string { - if int(code) < 0 || int(code) >= len(_codeMessages) { - return _codeMessages[Unknown] - } - return _codeMessages[code] -} - -// MarshalText satisfies TextMarshaler -func (code Code) MarshalText() ([]byte, error) { - return []byte(code.String()), nil -} - -// UnmarshalText satisfies TextUnmarshaler -func (code *Code) UnmarshalText(text []byte) error { - enum := string(text) - for i := 0; i < len(_codes); i++ { - if enum == _codes[i] { - *code = Code(i) - return nil - } - } - - *code = Unknown - return nil -} - -// UnmarshalBinary satisfies BinaryUnmarshaler -// Needed for msgpack -func (code *Code) UnmarshalBinary(data []byte) error { - return code.UnmarshalText(data) -} - -// MarshalBinary satisfies BinaryMarshaler -func (code Code) MarshalBinary() ([]byte, error) { - return []byte(code.String()), nil -} diff --git a/pkg/types/status/job_status.go b/pkg/types/status/job_status.go index eb299831ba..f106d051a7 100644 --- a/pkg/types/status/job_status.go +++ b/pkg/types/status/job_status.go @@ -24,15 +24,15 @@ import ( type BatchJobStatus struct { spec.BatchJob - Status JobCode `json:"status"` - EndTime *time.Time `json:"end_time,omitempty"` - BatchesInQueue int `json:"batches_in_queue"` - WorkerCounts *WorkerCounts `json:"worker_counts,omitempty"` + Status JobCode `json:"status" yaml:"status"` + EndTime *time.Time `json:"end_time,omitempty" yaml:"end_time,omitempty"` + BatchesInQueue int `json:"batches_in_queue" yaml:"batches_in_queue"` + WorkerCounts *WorkerCounts `json:"worker_counts,omitempty" yaml:"worker_counts,omitempty"` } type TaskJobStatus struct { spec.TaskJob - EndTime *time.Time `json:"end_time"` - Status JobCode `json:"status"` - WorkerCounts *WorkerCounts `json:"worker_counts"` + EndTime *time.Time `json:"end_time,omitempty" yaml:"end_time,omitempty"` + Status JobCode `json:"status" yaml:"status"` + WorkerCounts *WorkerCounts `json:"worker_counts,omitempty" yaml:"worker_counts,omitempty"` } diff --git a/pkg/types/status/status.go b/pkg/types/status/status.go index 6dad4e1992..e0de4943ef 100644 --- a/pkg/types/status/status.go +++ b/pkg/types/status/status.go @@ -16,47 +16,113 @@ limitations under the License. package status +import ( + kapps "k8s.io/api/apps/v1" +) + type Status struct { - APIName string `json:"api_name"` - APIID string `json:"api_id"` - Code Code `json:"status_code"` - ReplicaCounts `json:"replica_counts"` + Ready int32 `json:"ready" yaml:"ready"` // deployment-reported number of ready replicas (latest + out of date) + Requested int32 `json:"requested" yaml:"requested"` // deployment-reported number of requested replicas + UpToDate int32 `json:"up_to_date" yaml:"up_to_date"` // deployment-reported number of up-to-date replicas (in whichever phase they are found in) + ReplicaCounts *ReplicaCounts `json:"replica_counts,omitempty" yaml:"replica_counts,omitempty"` } -type ReplicaCounts struct { - Updated SubReplicaCounts `json:"updated"` - Stale SubReplicaCounts `json:"stale"` - Requested int32 `json:"requested"` +type ReplicaCountType string + +const ( + ReplicaCountRequested ReplicaCountType = "Requested" // requested number of replicas (for up-to-date pods) + ReplicaCountPending ReplicaCountType = "Pending" // pods that are in the pending state (for up-to-date pods) + ReplicaCountCreating ReplicaCountType = "Creating" // pods that that have their init/non-init containers in the process of being created (for up-to-date pods) + ReplicaCountNotReady ReplicaCountType = "NotReady" // pods that are not passing the readiness checks (for up-to-date pods) + ReplicaCountReady ReplicaCountType = "Ready" // pods that are passing the readiness checks (for up-to-date pods) + ReplicaCountReadyOutOfDate ReplicaCountType = "ReadyOutOfDate" // pods that are passing the readiness checks (for out-of-date pods) + ReplicaCountErrImagePull ReplicaCountType = "ErrImagePull" // pods that couldn't pull the containers' images (for up-to-date pods) + ReplicaCountTerminating ReplicaCountType = "Terminating" // pods that are in a terminating state (for up-to-date pods) + ReplicaCountFailed ReplicaCountType = "Failed" // pods that have had their containers erroring (for up-to-date pods) + ReplicaCountKilled ReplicaCountType = "Killed" // pods that have had their container processes killed (for up-to-date pods) + ReplicaCountKilledOOM ReplicaCountType = "KilledOOM" // pods that have had their containers OOM (for up-to-date pods) + ReplicaCountStalled ReplicaCountType = "Stalled" // pods that have been in a pending state for more than 15 mins (for up-to-date pods) + ReplicaCountUnknown ReplicaCountType = "Unknown" // pods that are in an unknown state (for up-to-date pods) +) + +var ReplicaCountTypes []ReplicaCountType = []ReplicaCountType{ + ReplicaCountRequested, ReplicaCountPending, ReplicaCountCreating, + ReplicaCountNotReady, ReplicaCountReady, ReplicaCountReadyOutOfDate, + ReplicaCountErrImagePull, ReplicaCountTerminating, ReplicaCountFailed, + ReplicaCountKilled, ReplicaCountKilledOOM, ReplicaCountStalled, + ReplicaCountUnknown, } -type SubReplicaCounts struct { - Pending int32 `json:"pending"` - Initializing int32 `json:"initializing"` - Ready int32 `json:"ready"` - ErrImagePull int32 `json:"err_image_pull"` - Terminating int32 `json:"terminating"` - Failed int32 `json:"failed"` - Killed int32 `json:"killed"` - KilledOOM int32 `json:"killed_oom"` - Stalled int32 `json:"stalled"` // pending for a long time - Unknown int32 `json:"unknown"` +type ReplicaCounts struct { + Requested int32 `json:"requested" yaml:"requested"` + Pending int32 `json:"pending" yaml:"pending"` + Creating int32 `json:"creating" yaml:"creating"` + NotReady int32 `json:"not_ready" yaml:"not_ready"` + Ready int32 `json:"ready" yaml:"ready"` + ReadyOutOfDate int32 `json:"ready_out_of_date" yaml:"ready_out_of_date"` + ErrImagePull int32 `json:"err_image_pull" yaml:"err_image_pull"` + Terminating int32 `json:"terminating" yaml:"terminating"` // includes up-to-date and out-of-date pods + Failed int32 `json:"failed" yaml:"failed"` + Killed int32 `json:"killed" yaml:"killed"` + KilledOOM int32 `json:"killed_oom" yaml:"killed_oom"` + Stalled int32 `json:"stalled" yaml:"stalled"` // pending for a long time + Unknown int32 `json:"unknown" yaml:"unknown"` } // Worker counts don't have as many failure variations because Jobs clean up dead pods, so counting different failure scenarios isn't interesting type WorkerCounts struct { - Pending int32 `json:"pending,omitempty"` - Initializing int32 `json:"initializing,omitempty"` - Running int32 `json:"running,omitempty"` - Succeeded int32 `json:"succeeded,omitempty"` - Failed int32 `json:"failed,omitempty"` - Stalled int32 `json:"stalled,omitempty"` // pending for a long time - Unknown int32 `json:"unknown,omitempty"` + Pending int32 `json:"pending,omitempty" yaml:"pending,omitempty"` + Creating int32 `json:"creating,omitempty" yaml:"creating,omitempty"` + NotReady int32 `json:"not_ready,omitempty" yaml:"not_ready,omitempty"` + Ready int32 `json:"ready,omitempty" yaml:"ready,omitempty"` + Succeeded int32 `json:"succeeded,omitempty" yaml:"succeeded,omitempty"` + ErrImagePull int32 `json:"err_image_pull,omitempty" yaml:"err_image_pull,omitempty"` + Terminating int32 `json:"terminating,omitempty" yaml:"terminating,omitempty"` + Failed int32 `json:"failed,omitempty" yaml:"failed,omitempty"` + Killed int32 `json:"killed,omitempty" yaml:"killed,omitempty"` + KilledOOM int32 `json:"killed_oom,omitempty" yaml:"killed_oom,omitempty"` + Stalled int32 `json:"stalled,omitempty" yaml:"stalled,omitempty"` // pending for a long time + Unknown int32 `json:"unknown,omitempty" yaml:"unknown,omitempty"` +} + +func FromDeployment(deployment *kapps.Deployment) *Status { + return &Status{ + Ready: deployment.Status.ReadyReplicas, + Requested: deployment.Status.Replicas, + UpToDate: deployment.Status.UpdatedReplicas, + } } -func (status *Status) Message() string { - return status.Code.Message() +func (counts *ReplicaCounts) GetCountBy(replicaType ReplicaCountType) int32 { + switch replicaType { + case ReplicaCountRequested: + return counts.Requested + case ReplicaCountPending: + return counts.Pending + case ReplicaCountCreating: + return counts.Creating + case ReplicaCountNotReady: + return counts.NotReady + case ReplicaCountReady: + return counts.Ready + case ReplicaCountReadyOutOfDate: + return counts.ReadyOutOfDate + case ReplicaCountErrImagePull: + return counts.ErrImagePull + case ReplicaCountTerminating: + return counts.Terminating + case ReplicaCountFailed: + return counts.Failed + case ReplicaCountKilled: + return counts.Killed + case ReplicaCountKilledOOM: + return counts.KilledOOM + case ReplicaCountStalled: + return counts.Stalled + } + return counts.Unknown } -func (src *SubReplicaCounts) TotalFailed() int32 { - return src.Failed + src.ErrImagePull + src.Killed + src.KilledOOM + src.Stalled +func (counts *ReplicaCounts) TotalFailed() int32 { + return counts.ErrImagePull + counts.Failed + counts.Killed + counts.KilledOOM + counts.Unknown } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index c2f8585941..c524c599e0 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -155,6 +155,10 @@ func IdentifyAPI(filePath string, name string, kind Kind, index int) string { func (api *API) ToK8sAnnotations() map[string]string { annotations := map[string]string{} + if len(api.APIs) > 0 { + annotations[NumTrafficSplitterTargetsAnnotationKey] = s.Int32(int32(len(api.APIs))) + } + if api.Pod != nil && api.Kind == RealtimeAPIKind { annotations[MaxConcurrencyAnnotationKey] = s.Int64(api.Pod.MaxConcurrency) annotations[MaxQueueLengthAnnotationKey] = s.Int64(api.Pod.MaxQueueLength) @@ -245,6 +249,36 @@ func AutoscalingFromAnnotations(k8sObj kmeta.Object) (*Autoscaling, error) { return &a, nil } +func TrafficSplitterTargetsFromAnnotations(k8sObj kmeta.Object) (int32, error) { + targets, err := k8s.ParseInt32Annotation(k8sObj, NumTrafficSplitterTargetsAnnotationKey) + if err != nil { + return 0, err + } + return targets, nil +} + +func EndpointFromAnnotation(k8sObj kmeta.Object) (string, error) { + endpoint, err := k8s.GetAnnotation(k8sObj, EndpointAnnotationKey) + if err != nil { + return "", err + } + return endpoint, nil +} + +func ConcurrencyFromAnnotations(k8sObj kmeta.Object) (int, int, error) { + maxQueueLength, err := k8s.ParseIntAnnotation(k8sObj, MaxQueueLengthAnnotationKey) + if err != nil { + return 0, 0, err + } + + maxConcurrency, err := k8s.ParseIntAnnotation(k8sObj, MaxConcurrencyAnnotationKey) + if err != nil { + return 0, 0, err + } + + return maxQueueLength, maxConcurrency, nil +} + func (api *API) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", NameKey, api.Name)) diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 826e144b05..5cbe3b2dda 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -91,6 +91,7 @@ const ( EndpointAnnotationKey = "networking.cortex.dev/endpoint" MaxConcurrencyAnnotationKey = "pod.cortex.dev/max-concurrency" MaxQueueLengthAnnotationKey = "pod.cortex.dev/max-queue-length" + NumTrafficSplitterTargetsAnnotationKey = "apis.cortex.dev/traffic-splitter-targets" MinReplicasAnnotationKey = "autoscaling.cortex.dev/min-replicas" MaxReplicasAnnotationKey = "autoscaling.cortex.dev/max-replicas" TargetInFlightAnnotationKey = "autoscaling.cortex.dev/target-in-flight"