diff --git a/pkg/skaffold/deploy/resource.go b/pkg/skaffold/deploy/resource.go new file mode 100644 index 00000000000..3faed70df22 --- /dev/null +++ b/pkg/skaffold/deploy/resource.go @@ -0,0 +1,39 @@ +/* +Copyright 2019 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deploy + +import ( + "context" + "time" + + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext" +) + +type Resource interface { + + // UpdateStatus updates the resource status + UpdateStatus(string, error) + + // IsStatusCheckComplete returns if the resource status check is complele + IsStatusCheckComplete() bool + + // Deadline returns the deadline for the resource + Deadline() time.Duration + + // CheckStatus checks resource status + CheckStatus(context.Context, *runcontext.RunContext) +} diff --git a/pkg/skaffold/deploy/resource/deployment.go b/pkg/skaffold/deploy/resource/deployment.go index b6b854d064a..ffd56a92bfd 100644 --- a/pkg/skaffold/deploy/resource/deployment.go +++ b/pkg/skaffold/deploy/resource/deployment.go @@ -17,12 +17,26 @@ limitations under the License. package resource import ( + "context" + "errors" "fmt" + "strings" "time" + + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext" ) const ( - deploymentType = "deployment" + deploymentType = "deployment" + rollOutSuccess = "successfully rolled out" + connectionErrMsg = "Unable to connect to the server" + killedErrMsg = "signal: killed" +) + +var ( + errKubectlKilled = errors.New("kubectl rollout status command killed") + ErrKubectlConnection = errors.New("kubectl connection error") ) type Deployment struct { @@ -54,17 +68,16 @@ func (d *Deployment) UpdateStatus(details string, err error) { updated := newStatus(details, err) if !d.status.Equal(updated) { d.status = updated + if strings.Contains(details, rollOutSuccess) || isErrAndNotRetryAble(err) { + d.done = true + } } } -func (d *Deployment) IsDone() bool { +func (d *Deployment) IsStatusCheckComplete() bool { return d.done } -func (d *Deployment) MarkDone() { - d.done = true -} - func (d *Deployment) ReportSinceLastUpdated() string { if d.status.reported { return "" @@ -73,6 +86,13 @@ func (d *Deployment) ReportSinceLastUpdated() string { return fmt.Sprintf("%s %s", d, d.status) } +func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunContext) { + cli := kubectl.NewFromRunContext(runCtx) + b, err := cli.RunOut(ctx, "rollout", "status", "deployment", d.name, "--namespace", d.namespace, "--watch=false") + err = parseKubectlRolloutError(err) + d.UpdateStatus(string(b), err) +} + func NewDeployment(name string, ns string, deadline time.Duration) *Deployment { return &Deployment{ name: name, @@ -82,3 +102,23 @@ func NewDeployment(name string, ns string, deadline time.Duration) *Deployment { status: newStatus("", nil), } } + +func parseKubectlRolloutError(err error) error { + if err == nil { + return err + } + if strings.Contains(err.Error(), connectionErrMsg) { + return ErrKubectlConnection + } + if strings.Contains(err.Error(), killedErrMsg) { + return errKubectlKilled + } + return err +} + +func isErrAndNotRetryAble(err error) bool { + if err == nil { + return false + } + return err != ErrKubectlConnection +} diff --git a/pkg/skaffold/deploy/resource/deployment_test.go b/pkg/skaffold/deploy/resource/deployment_test.go index b40c4253c51..334518bb3de 100644 --- a/pkg/skaffold/deploy/resource/deployment_test.go +++ b/pkg/skaffold/deploy/resource/deployment_test.go @@ -17,8 +17,11 @@ limitations under the License. package resource import ( + "context" "testing" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/GoogleContainerTools/skaffold/testutil" "github.com/pkg/errors" ) @@ -82,3 +85,158 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) { }) } } + +func TestDeploymentCheckStatus(t *testing.T) { + rolloutCmd := "kubectl --context kubecontext rollout status deployment dep --namespace test --watch=false" + tests := []struct { + description string + commands util.Command + expectedErr string + expectedDetails string + complete bool + }{ + { + description: "rollout status success", + commands: testutil.CmdRunOut( + rolloutCmd, + "deployment dep successfully rolled out", + ), + expectedDetails: "deployment dep successfully rolled out", + complete: true, + }, + { + description: "resource not complete", + commands: testutil.CmdRunOut( + rolloutCmd, + "Waiting for replicas to be available", + ), + expectedDetails: "Waiting for replicas to be available", + }, + { + description: "no output", + commands: testutil.CmdRunOut( + rolloutCmd, + "", + ), + }, + { + description: "rollout status error", + commands: testutil.CmdRunOutErr( + rolloutCmd, + "", + errors.New("error"), + ), + expectedErr: "error", + complete: true, + }, + { + description: "rollout kubectl client connection error", + commands: testutil.CmdRunOutErr( + rolloutCmd, + "", + errors.New("Unable to connect to the server"), + ), + expectedErr: ErrKubectlConnection.Error(), + }, + } + + for _, test := range tests { + testutil.Run(t, test.description, func(t *testutil.T) { + t.Override(&util.DefaultExecCommand, test.commands) + r := Deployment{namespace: "test", name: "dep"} + runCtx := &runcontext.RunContext{ + KubeContext: "kubecontext", + } + + r.CheckStatus(context.Background(), runCtx) + t.CheckDeepEqual(test.complete, r.IsStatusCheckComplete()) + if test.expectedErr != "" { + t.CheckErrorContains(test.expectedErr, r.Status().Error()) + } else { + t.CheckDeepEqual(r.status.details, test.expectedDetails) + } + }) + } +} + +func TestParseKubectlError(t *testing.T) { + tests := []struct { + description string + err error + expected string + shouldErr bool + }{ + { + description: "rollout status connection error", + err: errors.New("Unable to connect to the server"), + expected: ErrKubectlConnection.Error(), + shouldErr: true, + }, + { + description: "rollout status kubectl command killed", + err: errors.New("signal: killed"), + expected: errKubectlKilled.Error(), + shouldErr: true, + }, + { + description: "rollout status random error", + err: errors.New("deployment test not found"), + expected: "deployment test not found", + shouldErr: true, + }, + { + description: "rollout status nil error", + }, + } + for _, test := range tests { + testutil.Run(t, test.description, func(t *testutil.T) { + actual := parseKubectlRolloutError(test.err) + t.CheckError(test.shouldErr, actual) + if test.shouldErr { + t.CheckErrorContains(test.expected, actual) + } + }) + } +} + +func TestIsErrAndNotRetriable(t *testing.T) { + tests := []struct { + description string + err error + expected bool + }{ + { + description: "rollout status connection error", + err: ErrKubectlConnection, + }, + { + description: "rollout status kubectl command killed", + err: errKubectlKilled, + expected: true, + }, + { + description: "rollout status random error", + err: errors.New("deployment test not found"), + expected: true, + }, + { + description: "rollout status parent context cancelled", + err: context.Canceled, + expected: true, + }, + { + description: "rollout status parent conetct timed out", + err: context.DeadlineExceeded, + expected: true, + }, + { + description: "rollout status nil error", + }, + } + for _, test := range tests { + testutil.Run(t, test.description, func(t *testutil.T) { + actual := isErrAndNotRetryAble(test.err) + t.CheckDeepEqual(test.expected, actual) + }) + } +} diff --git a/pkg/skaffold/deploy/status_check.go b/pkg/skaffold/deploy/status_check.go index fb16cd826d9..8d2f4ef475f 100644 --- a/pkg/skaffold/deploy/status_check.go +++ b/pkg/skaffold/deploy/status_check.go @@ -32,7 +32,6 @@ import ( "github.com/GoogleContainerTools/skaffold/pkg/skaffold/color" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl" pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext" ) @@ -45,9 +44,6 @@ var ( // report resource status for pending resources 0.5 second. reportStatusTime = 500 * time.Millisecond - - // For testing - executeRolloutStatus = getRollOutStatus ) const ( @@ -79,7 +75,7 @@ func StatusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx * wg.Add(1) go func(d *resource.Deployment) { defer wg.Done() - pollDeploymentRolloutStatus(ctx, kubectl.NewFromRunContext(runCtx), d) + pollResourceStatus(ctx, runCtx, d) pending := c.markProcessed() printStatusCheckSummary(out, d, pending, c.total) }(d) @@ -117,24 +113,20 @@ func getDeployments(client kubernetes.Interface, ns string, l *DefaultLabeller, return deployments, nil } -func pollDeploymentRolloutStatus(ctx context.Context, k *kubectl.CLI, d *resource.Deployment) { +func pollResourceStatus(ctx context.Context, runCtx *runcontext.RunContext, r Resource) { pollDuration := time.Duration(defaultPollPeriodInMilliseconds) * time.Millisecond // Add poll duration to account for one last attempt after progressDeadlineSeconds. - timeoutContext, cancel := context.WithTimeout(ctx, d.Deadline()+pollDuration) - logrus.Debugf("checking rollout status %s", d.String()) + timeoutContext, cancel := context.WithTimeout(ctx, r.Deadline()+pollDuration) + logrus.Debugf("checking status %s", r) defer cancel() for { select { case <-timeoutContext.Done(): - err := errors.Wrap(timeoutContext.Err(), fmt.Sprintf("deployment rollout status could not be fetched within %v", d.Deadline())) - d.UpdateStatus(err.Error(), err) - d.MarkDone() + r.UpdateStatus(timeoutContext.Err().Error(), timeoutContext.Err()) return case <-time.After(pollDuration): - status, err := executeRolloutStatus(timeoutContext, k, d.Name()) - d.UpdateStatus(status, err) - if err != nil || strings.Contains(status, "successfully rolled out") { - d.MarkDone() + r.CheckStatus(timeoutContext, runCtx) + if r.IsStatusCheckComplete() { return } } @@ -154,11 +146,6 @@ func getSkaffoldDeployStatus(deployments []*resource.Deployment) error { return fmt.Errorf("following deployments are not stable:\n%s", strings.Join(errorStrings, "\n")) } -func getRollOutStatus(ctx context.Context, k *kubectl.CLI, dName string) (string, error) { - b, err := k.RunOut(ctx, "rollout", "status", "deployment", dName, "--watch=false") - return string(b), err -} - func getDeadline(d int) time.Duration { if d > 0 { return time.Duration(d) * time.Second @@ -201,7 +188,7 @@ func printResourceStatus(ctx context.Context, out io.Writer, deps []*resource.De func printStatus(deps []*resource.Deployment, out io.Writer) bool { allResourcesCheckComplete := true for _, d := range deps { - if d.IsDone() { + if d.IsStatusCheckComplete() { continue } allResourcesCheckComplete = false diff --git a/pkg/skaffold/deploy/status_check_test.go b/pkg/skaffold/deploy/status_check_test.go index 04592269fd9..a94fff0bfc6 100644 --- a/pkg/skaffold/deploy/status_check_test.go +++ b/pkg/skaffold/deploy/status_check_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext" "github.com/google/go-cmp/cmp" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,8 +33,6 @@ import ( fakekubeclientset "k8s.io/client-go/kubernetes/fake" utilpointer "k8s.io/utils/pointer" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl" - "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/GoogleContainerTools/skaffold/testutil" ) @@ -202,50 +201,52 @@ func TestGetDeployments(t *testing.T) { } } -func TestPollDeploymentRolloutStatus(t *testing.T) { - rolloutCmd := "kubectl --context kubecontext --namespace test rollout status deployment dep --watch=false" +type mockResource struct { + inErr bool + done bool +} + +func (m *mockResource) UpdateStatus(s string, err error) { + if err == context.DeadlineExceeded { + m.inErr = true + } +} + +func (m *mockResource) Deadline() time.Duration { + return 5 +} + +func (m *mockResource) CheckStatus(context.Context, *runcontext.RunContext) { +} + +func (m *mockResource) IsStatusCheckComplete() bool { + return m.done +} + +func TestPollResourceStatus(t *testing.T) { tests := []struct { - description string - commands util.Command - duration int - shouldErr bool + description string + dummyResource *mockResource + isInErr bool }{ { - description: "rollout returns success", - commands: testutil.CmdRunOut( - rolloutCmd, - "dep successfully rolled out", - ), - duration: 50, - }, { - description: "rollout returns error in the first attempt", - commands: testutil.CmdRunOutErr( - rolloutCmd, "could not find", - errors.New("deployment.apps/dep could not be found"), - ), - shouldErr: true, - duration: 50, - }, { - description: "rollout returns did not stabilize within the given timeout", - commands: testutil. - CmdRunOut(rolloutCmd, "Waiting for rollout to finish: 1 of 3 updated replicas are available..."). - AndRunOut(rolloutCmd, "Waiting for rollout to finish: 1 of 3 updated replicas are available..."). - AndRunOut(rolloutCmd, "Waiting for rollout to finish: 2 of 3 updated replicas are available..."), - duration: 20, - shouldErr: true, + description: "resource never stabilize within deadline", + dummyResource: &mockResource{}, + isInErr: true, + }, + { + description: "resource stabilizes", + dummyResource: &mockResource{done: true}, }, } for _, test := range tests { testutil.Run(t, test.description, func(t *testutil.T) { t.Override(&defaultPollPeriodInMilliseconds, 10) - t.Override(&util.DefaultExecCommand, test.commands) - - cli := &kubectl.CLI{KubeContext: testKubeContext, Namespace: "test"} - d := resource.NewDeployment("dep", "test", time.Duration(test.duration)*time.Millisecond) - pollDeploymentRolloutStatus(context.Background(), cli, d) - t.CheckError(test.shouldErr, d.Status().Error()) + pollResourceStatus(context.Background(), nil, test.dummyResource) + t.CheckDeepEqual(test.dummyResource.inErr, test.isInErr) }) } + } func TestGetDeployStatus(t *testing.T) { @@ -322,52 +323,6 @@ func TestGetDeployStatus(t *testing.T) { } } -func TestGetRollOutStatus(t *testing.T) { - rolloutCmd := "kubectl --context kubecontext --namespace test rollout status deployment dep --watch=false" - tests := []struct { - description string - commands util.Command - expected string - shouldErr bool - }{ - { - description: "some output", - commands: testutil.CmdRunOut( - rolloutCmd, - "Waiting for replicas to be available", - ), - expected: "Waiting for replicas to be available", - }, - { - description: "no output", - commands: testutil.CmdRunOut( - rolloutCmd, - "", - ), - }, - { - description: "rollout status error", - commands: testutil.CmdRunOutErr( - rolloutCmd, - "", - errors.New("error"), - ), - shouldErr: true, - }, - } - - for _, test := range tests { - testutil.Run(t, test.description, func(t *testutil.T) { - t.Override(&util.DefaultExecCommand, test.commands) - - cli := &kubectl.CLI{KubeContext: testKubeContext, Namespace: "test"} - actual, err := getRollOutStatus(context.Background(), cli, "dep") - - t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expected, actual) - }) - } -} - func TestPrintSummaryStatus(t *testing.T) { tests := []struct { description string @@ -425,9 +380,9 @@ func TestPrintStatus(t *testing.T) { { description: "single resource successful marked complete - skip print", rs: []*resource.Deployment{ - withDone( + withStatus( resource.NewDeployment("r1", "test", 1), - "success", + "deployment successfully rolled out", nil, ), }, @@ -436,7 +391,7 @@ func TestPrintStatus(t *testing.T) { { description: "single resource in error marked complete -skip print", rs: []*resource.Deployment{ - withDone( + withStatus( resource.NewDeployment("r1", "test", 1), "error", fmt.Errorf("error"), @@ -447,9 +402,9 @@ func TestPrintStatus(t *testing.T) { { description: "multiple resources 1 not complete", rs: []*resource.Deployment{ - withDone( + withStatus( resource.NewDeployment("r1", "test", 1), - "succes", + "deployment successfully rolled out", nil, ), withStatus( @@ -461,20 +416,20 @@ func TestPrintStatus(t *testing.T) { expectedOut: " - test:deployment/r2 pending\n", }, { - description: "multiple resources 1 not complete and in error", + description: "multiple resources 1 not complete and retry-able error", rs: []*resource.Deployment{ - withDone( + withStatus( resource.NewDeployment("r1", "test", 1), - "succes", + "deployment successfully rolled out", nil, ), withStatus( resource.NewDeployment("r2", "test", 1), "", - fmt.Errorf("context deadline expired"), + resource.ErrKubectlConnection, ), }, - expectedOut: " - test:deployment/r2 context deadline expired\n", + expectedOut: " - test:deployment/r2 kubectl connection error\n", }, } @@ -488,12 +443,6 @@ func TestPrintStatus(t *testing.T) { } } -func withDone(d *resource.Deployment, details string, err error) *resource.Deployment { - d.UpdateStatus(details, err) - d.MarkDone() - return d -} - func withStatus(d *resource.Deployment, details string, err error) *resource.Deployment { d.UpdateStatus(details, err) return d