diff --git a/test/integration/test/deck_test.go b/test/integration/test/deck_test.go index 03270bab5..60572da99 100644 --- a/test/integration/test/deck_test.go +++ b/test/integration/test/deck_test.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "sort" - "strconv" "strings" "testing" "time" @@ -358,7 +357,10 @@ func TestDeckTenantIDs(t *testing.T) { func TestRerun(t *testing.T) { t.Parallel() const rerunJobConfigFile = "rerun-test.yaml" + jobName := "rerun-test-job-" + RandomString(t) + prowJobSelector := labels.SelectorFromSet(map[string]string{kube.ProwJobAnnotation: jobName}) + var rerunJobConfigTemplate = `periodics: - interval: 1h name: %s @@ -379,137 +381,137 @@ func TestRerun(t *testing.T) { t.Fatalf("Failed creating clients for cluster %q: %v", clusterContext, err) } - rerunJobConfig := fmt.Sprintf(rerunJobConfigTemplate, jobName, "foo") - if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte(rerunJobConfig)); err != nil { - t.Fatalf("Failed update job config: %v", err) - } + ctx := context.Background() - // Now we are waiting on Horologium to create the first prow job so that we - // can rerun from. - // Horologium itself is pretty good at handling the configmap update, but - // not kubelet, according to - // https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs - // configmap updates on existing pods every minute, which is a long wait. - // The proposed fix in the issue was updating the deployment, which imo - // should be better handled by just refreshing pods. - // So here comes forcing restart of horologium pods. - if err := refreshProwPods(kubeClient, context.Background(), "horologium"); err != nil { - t.Fatalf("Failed refreshing horologium pods: %v", err) - } - // Same with deck - if err := refreshProwPods(kubeClient, context.Background(), "deck"); err != nil { - t.Fatalf("Failed refreshing deck pods: %v", err) + redeployJobConfig := func(jobConfig string) { + if err := updateJobConfig(ctx, kubeClient, rerunJobConfigFile, []byte(jobConfig)); err != nil { + t.Fatalf("Failed update job config: %v", err) + } + + // Now we are waiting on Horologium to create the first prow job so that we + // can rerun from. + // Horologium itself is pretty good at handling the configmap update, but + // not kubelet, according to + // https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs + // configmap updates on existing pods every minute, which is a long wait. + // It's quicker to rollout the affected Deployments. + if err := rolloutDeployment(t, ctx, kubeClient, "horologium"); err != nil { + t.Fatalf("Failed rolling out Horologium: %v", err) + } + // Same with deck + if err := rolloutDeployment(t, ctx, kubeClient, "deck"); err != nil { + t.Fatalf("Failed rolling out Deck: %v", err) + } } + // Deploy the initial template with "foo" as the label. + redeployJobConfig(fmt.Sprintf(rerunJobConfigTemplate, jobName, "foo")) + t.Cleanup(func() { - if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte{}); err != nil { + if err := updateJobConfig(ctx, kubeClient, rerunJobConfigFile, []byte{}); err != nil { t.Logf("ERROR CLEANUP: %v", err) } - labels, err := labels.Parse("prow.k8s.io/job = " + jobName) - if err != nil { - t.Logf("Skip cleaning up jobs, as failed parsing label: %v", err) - return + // Prevent horologium from immediately creating the "missing" ProwJob after the + // DeleteAll call further down, because horologium still runs with the last + // non-empty configuration (foo=bar). + if err := rolloutDeployment(t, ctx, kubeClient, "horologium"); err != nil { + t.Logf("Failed rolling out Horologium: %v", err) } - if err := kubeClient.DeleteAllOf(context.Background(), &prowjobv1.ProwJob{}, &ctrlruntimeclient.DeleteAllOfOptions{ - ListOptions: ctrlruntimeclient.ListOptions{LabelSelector: labels}, + if err := kubeClient.DeleteAllOf(ctx, &prowjobv1.ProwJob{}, &ctrlruntimeclient.DeleteAllOfOptions{ + ListOptions: ctrlruntimeclient.ListOptions{ + Namespace: defaultNamespace, + LabelSelector: prowJobSelector, + }, }); err != nil { t.Logf("ERROR CLEANUP: %v", err) } }) - ctx := context.Background() + getLatestJob := func(t *testing.T, jobName string, lastRun *v1.Time) *prowjobv1.ProwJob { var res *prowjobv1.ProwJob if err := wait.PollUntilContextTimeout(ctx, time.Second, 90*time.Second, true, func(ctx context.Context) (bool, error) { pjs := &prowjobv1.ProwJobList{} - err = kubeClient.List(ctx, pjs, &ctrlruntimeclient.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{kube.ProwJobAnnotation: jobName}), + err := kubeClient.List(ctx, pjs, &ctrlruntimeclient.ListOptions{ + LabelSelector: prowJobSelector, Namespace: defaultNamespace, }) if err != nil { return false, fmt.Errorf("failed listing prow jobs: %w", err) } + sort.Slice(pjs.Items, func(i, j int) bool { - revi, _ := strconv.Atoi(pjs.Items[i].ResourceVersion) - revj, _ := strconv.Atoi(pjs.Items[j].ResourceVersion) - return revi > revj + createdi := pjs.Items[i].CreationTimestamp + createdj := pjs.Items[j].CreationTimestamp + return createdj.Before(&createdi) }) + if len(pjs.Items) > 0 { if lastRun != nil && pjs.Items[0].CreationTimestamp.Before(lastRun) { return false, nil } res = &pjs.Items[0] } + return res != nil, nil }); err != nil { t.Fatalf("Failed waiting for job %q: %v", jobName, err) } return res } - rerun := func(t *testing.T, jobName string, mode string) { - req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost/rerun?mode=%v&prowjob=%v", mode, jobName), nil) - if err != nil { - t.Fatalf("Could not generate a request %v", err) - } - // Deck might not have been informed about the job config update, retry - // for this case. - waitDur := time.Second * 5 - var lastErr error - for i := 0; i < 3; i++ { - lastErr = nil - res, err := http.DefaultClient.Do(req) - if err != nil { - lastErr = fmt.Errorf("could not make post request %v", err) - res.Body.Close() - break - } - // The only retry condition is status not ok - if res.StatusCode != http.StatusOK { - lastErr = fmt.Errorf("status not expected: %d", res.StatusCode) - res.Body.Close() - waitDur *= 2 - time.Sleep(waitDur) - continue - } - body, err := io.ReadAll(res.Body) - if err != nil { - lastErr = fmt.Errorf("could not read body response %v", err) - res.Body.Close() - break - } - t.Logf("Response body: %s", string(body)) - break - } - if lastErr != nil { - t.Fatalf("Failed trigger rerun: %v", lastErr) - } - } - jobToRerun := getLatestJob(t, jobName, nil) - rerunJobConfig = fmt.Sprintf(rerunJobConfigTemplate, jobName, "bar") - if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte(rerunJobConfig)); err != nil { - t.Fatalf("Failed update job config: %v", err) - } - var passed bool - // It may take some time for the new ProwJob to show up, so we will - // check every 30s interval three times for it to appear - latestRun := jobToRerun - for i := 0; i < 3; i++ { - time.Sleep(30 * time.Second) - rerun(t, jobToRerun.Name, "latest") - if latestRun = getLatestJob(t, jobName, &latestRun.CreationTimestamp); latestRun.Labels["foo"] == "bar" { - passed = true - break - } - } - if !passed { - t.Fatal("Expected updated job.") + // Wait for the first job to be created by horologium. + initialJob := getLatestJob(t, jobName, nil) + + // Update the job configuration with a new label. + redeployJobConfig(fmt.Sprintf(rerunJobConfigTemplate, jobName, "bar")) + + // Rerun the job using the latest config. + rerunJob(t, ctx, initialJob.Name, "latest") + + // Wait until the desired ProwJob shows up. + latestJob := getLatestJob(t, jobName, &initialJob.CreationTimestamp) + if latestJob.Labels["foo"] != "bar" { + t.Fatalf("Failed waiting for ProwJob %q using latest config with foo=bar.", jobName) } + // Prevent Deck from being too fast and recreating the new job in the same second + // as the previous one. + time.Sleep(1 * time.Second) + // Deck scheduled job from latest configuration, rerun with "original" // should still go with original configuration. - rerun(t, jobToRerun.Name, "original") - if latestRun := getLatestJob(t, jobName, &latestRun.CreationTimestamp); latestRun.Labels["foo"] != "foo" { - t.Fatalf("Job label mismatch. Want: 'foo', got: '%s'", latestRun.Labels["foo"]) + rerunJob(t, ctx, initialJob.Name, "original") + + originalJob := getLatestJob(t, jobName, &latestJob.CreationTimestamp) + if originalJob.Labels["foo"] != "foo" { + t.Fatalf("Failed waiting for ProwJob %q using original config with foo=foo.", jobName) + } +} + +func rerunJob(t *testing.T, ctx context.Context, jobName string, mode string) { + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/rerun?mode=%v&prowjob=%v", mode, jobName), nil) + if err != nil { + t.Fatalf("Could not generate a request %v", err) + } + + // Deck might not be fully ready yet, so we must retry. + if err := wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) { + res, err := http.DefaultClient.Do(req) + if err != nil { + return false, fmt.Errorf("could not make post request: %w", err) + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + t.Logf("Failed to read response body: %v", err) + return false, nil + } + t.Logf("Response body: %s", string(body)) + + return res.StatusCode == http.StatusOK, nil + }); err != nil { + t.Fatalf("Failed to rerun job %q with %s config: %v", jobName, mode, err) } } diff --git a/test/integration/test/horologium_test.go b/test/integration/test/horologium_test.go index ca6861ea5..0fbe517ba 100644 --- a/test/integration/test/horologium_test.go +++ b/test/integration/test/horologium_test.go @@ -66,10 +66,8 @@ func TestLaunchProwJob(t *testing.T) { // not kubelet, according to // https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs // configmap updates on existing pods every minute, which is a long wait. - // The proposed fix in the issue was updating the deployment, which imo - // should be better handled by just refreshing pods. - // So here comes forcing restart of horologium pods. - if err := refreshProwPods(kubeClient, context.Background(), "horologium"); err != nil { + // It's quicker to rollout the affected Deployments. + if err := rolloutDeployment(t, context.Background(), kubeClient, "horologium"); err != nil { t.Fatalf("Failed refreshing horologium pods: %v", err) } diff --git a/test/integration/test/setup.go b/test/integration/test/setup.go index e40e34819..4667437ef 100644 --- a/test/integration/test/setup.go +++ b/test/integration/test/setup.go @@ -21,14 +21,18 @@ import ( "context" "crypto/rand" "crypto/sha256" + "errors" "flag" "fmt" "io" "sync" "testing" + "time" + appsv1 "k8s.io/api/apps/v1" coreapi "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" @@ -106,20 +110,55 @@ func getPodLogs(clientset *kubernetes.Clientset, namespace, podName string, opts return str, nil } -func refreshProwPods(client ctrlruntimeclient.Client, ctx context.Context, name string) error { +func rolloutDeployment(t *testing.T, ctx context.Context, client ctrlruntimeclient.Client, name string) error { prowComponentsMux.Lock() defer prowComponentsMux.Unlock() - var pods coreapi.PodList - labels, _ := labels.Parse("app = " + name) - if err := client.List(ctx, &pods, &ctrlruntimeclient.ListOptions{LabelSelector: labels}); err != nil { - return err + var depl appsv1.Deployment + if err := client.Get(ctx, types.NamespacedName{Name: name, Namespace: defaultNamespace}, &depl); err != nil { + return fmt.Errorf("failed to get Deployment: %w", err) + } + + if replicas := depl.Spec.Replicas; replicas == nil || *replicas < 1 { + return errors.New("cannot restart a Deployment with zero replicas.") + } + + labels := depl.Spec.Template.Labels + if labels == nil { + // This should never happen. + labels = map[string]string{} + } + labels["restart"] = RandomString(t) + + t.Logf("Restarting %s...", name) + if err := client.Update(ctx, &depl); err != nil { + return fmt.Errorf("failed to update Deployment: %w", err) } - for _, pod := range pods.Items { - if err := client.Delete(ctx, &pod); err != nil { - return err + + timeout := 30 * time.Second + if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, func(ctx context.Context) (bool, error) { + var current appsv1.Deployment + if err := client.Get(ctx, ctrlruntimeclient.ObjectKeyFromObject(&depl), ¤t); err != nil { + return false, fmt.Errorf("failed to get current Deployment: %w", err) + } + + replicas := current.Spec.Replicas + if replicas == nil || *replicas < 1 { + // This should never happen. + return false, errors.New("Deployment has no replicas defined") } + + ready := true && + current.Status.AvailableReplicas == *replicas && + current.Status.ReadyReplicas == *replicas && + current.Status.UpdatedReplicas == *replicas && + current.Status.UnavailableReplicas == 0 + + return ready, nil + }); err != nil { + return fmt.Errorf("Deployment did not fully roll out after %v: %w", timeout, err) } + return nil }