Skip to content

Commit

Permalink
Add e2e test for pending prs processed
Browse files Browse the repository at this point in the history
Add a new test that will test that when the watcher is down and a PR is
created, the PipelineRuns are kept in Pending state. When the watcher is
up again, the PipelineRuns are restarted.

Some refactoring has been done to move some functions to their own.

Signed-off-by: Chmouel Boudjnah <chmouel@redhat.com>
  • Loading branch information
chmouel committed Oct 22, 2024
1 parent 612379b commit 5b742f4
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 78 deletions.
186 changes: 108 additions & 78 deletions test/github_pullrequest_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,111 @@ import (
"context"
"fmt"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/google/go-github/v64/github"
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/triggertype"
"github.com/openshift-pipelines/pipelines-as-code/pkg/sort"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/cctx"
tgithub "github.com/openshift-pipelines/pipelines-as-code/test/pkg/github"
tkubestuff "github.com/openshift-pipelines/pipelines-as-code/test/pkg/kubestuff"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/options"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/payload"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/repository"
trepository "github.com/openshift-pipelines/pipelines-as-code/test/pkg/repository"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/wait"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/names"
"gotest.tools/v3/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const pipelineRunFileNamePrefix = "prlongrunnning-"

func TestGithubSecondPullRequestConcurrency1by1(t *testing.T) {
ctx := context.Background()
label := "Github PullRequest Concurrent, sequentially one by one"
numberOfPipelineRuns := 5
maxNumberOfConcurrentPipelineRuns := 1
testGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, true, map[string]string{})
checkOrdering := true
yamlFiles := map[string]string{}
g := setupGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, yamlFiles)
defer g.TearDown(ctx, t)
testGithubConcurrency(ctx, t, g, numberOfPipelineRuns, checkOrdering)
}

// TestGithubSecondPullRequestConcurrencyRestartedWhenWatcherIsUp tests that
// when the watcher is down and a PR is created, the PipelineRuns are kept in
// Pending state. When the watcher is up again, the PipelineRuns are restarted.
func TestGithubSecondPullRequestConcurrencyRestartedWhenWatcherIsUp(t *testing.T) {
ctx := context.Background()
label := "Github PullRequest Concurrent, sequentially one by one"
numberOfPipelineRuns := 2
maxNumberOfConcurrentPipelineRuns := 1
checkOrdering := true
yamlFiles := map[string]string{}
ctx, runCnxS, _, _, err := tgithub.Setup(ctx, true, false)
assert.NilError(t, err)

tkubestuff.ScaleDeployment(ctx, t, runCnxS, 0, "pipelines-as-code-watcher", "pipelines-as-code")
time.Sleep(10 * time.Second)
defer tkubestuff.ScaleDeployment(ctx, t, runCnxS, 1, "pipelines-as-code-watcher", "pipelines-as-code")
g := setupGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, yamlFiles)
defer g.TearDown(ctx, t)

maxLoop := 30
allPipelineRunsStarted := true
for i := 0; i < maxLoop; i++ {
prs, err := g.Cnx.Clients.Tekton.TektonV1().PipelineRuns(g.TargetNamespace).List(ctx, metav1.ListOptions{})
assert.NilError(t, err)

assert.Assert(t, len(prs.Items) <= numberOfPipelineRuns, "Too many PipelineRuns have been created, expected: %d, got: %d", numberOfPipelineRuns, len(prs.Items))
if len(prs.Items) != numberOfPipelineRuns {
time.Sleep(10 * time.Second)
g.Cnx.Clients.Log.Infof("Waiting for %d PipelineRuns to be created", numberOfPipelineRuns)
allPipelineRunsStarted = false
continue
}
allPipelineRunsStarted = true
for _, pr := range prs.Items {
for _, condition := range pr.Status.GetConditions() {
assert.Assert(t, condition.GetReason() == tektonv1.PipelineRunSpecStatusPending, "PipelineRun %s is not in pending state", pr.GetName())
}
}
break
}
assert.Assert(t, allPipelineRunsStarted, "Not all PipelineRuns have been created, expected: ", numberOfPipelineRuns)

g.Cnx.Clients.Log.Info("All PipelineRuns are Pending")
tkubestuff.ScaleDeployment(ctx, t, runCnxS, 1, "pipelines-as-code-watcher", "pipelines-as-code")
testGithubConcurrency(ctx, t, g, numberOfPipelineRuns, checkOrdering)
}

func TestGithubSecondPullRequestConcurrency3by3(t *testing.T) {
ctx := context.Background()
label := "Github PullRequest Concurrent three at time"
numberOfPipelineRuns := 10
maxNumberOfConcurrentPipelineRuns := 3
testGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, false, map[string]string{})
checkOrdering := false
yamlFiles := map[string]string{}

g := setupGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, yamlFiles)
defer g.TearDown(ctx, t)
testGithubConcurrency(ctx, t, g, numberOfPipelineRuns, checkOrdering)
}

func TestGithubSecondPullRequestConcurrency1by1WithError(t *testing.T) {
ctx := context.Background()
label := "Github PullRequest Concurrent, sequentially one by one with one bad apple"
numberOfPipelineRuns := 1
maxNumberOfConcurrentPipelineRuns := 1
testGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, true, map[string]string{
checkOrdering := true
yamlFiles := map[string]string{
".tekton/00-bad-apple.yaml": "testdata/failures/bad-runafter-task.yaml",
})
}

g := setupGithubConcurrency(ctx, t, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns, label, yamlFiles)
defer g.TearDown(ctx, t)
testGithubConcurrency(ctx, t, g, numberOfPipelineRuns, checkOrdering)
}

func TestGithubGlobalRepoConcurrencyLimit(t *testing.T) {
Expand All @@ -66,25 +124,29 @@ func TestGithubGlobalRepoConcurrencyLimit(t *testing.T) {

func TestGithubGlobalAndLocalRepoConcurrencyLimit(t *testing.T) {
label := "Github PullRequest Concurrent Three at a Time Set by Local Repo"
testGlobalRepoConcurrency(t, label /* localRepoMaxConcurrentRuns */, 3)
testGlobalRepoConcurrency(t, label, 3)
}

func testGlobalRepoConcurrency(t *testing.T, label string, localRepoMaxConcurrentRuns int) {
ctx := context.Background()
// create global repo
ctx, globalNS, runcnx, err := createGlobalRepo(ctx)
ctx, globalNS, runcnx, err := trepository.CreateGlobalRepo(ctx)
assert.NilError(t, err)
defer (func() {
err = cleanUpGlobalRepo(runcnx, globalNS)
err = trepository.CleanUpGlobalRepo(runcnx, globalNS)
assert.NilError(t, err)
})()

numberOfPipelineRuns := 10
testGithubConcurrency(ctx, t, localRepoMaxConcurrentRuns, numberOfPipelineRuns, label, false, map[string]string{})
checkOrdering := false
yamlFiles := map[string]string{}

g := setupGithubConcurrency(ctx, t, localRepoMaxConcurrentRuns, numberOfPipelineRuns, label, yamlFiles)
defer g.TearDown(ctx, t)
testGithubConcurrency(ctx, t, g, numberOfPipelineRuns, checkOrdering)
}

func testGithubConcurrency(ctx context.Context, t *testing.T, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns int, label string, checkOrdering bool, yamlFiles map[string]string) {
pipelineRunFileNamePrefix := "prlongrunnning-"
func setupGithubConcurrency(ctx context.Context, t *testing.T, maxNumberOfConcurrentPipelineRuns, numberOfPipelineRuns int, label string, yamlFiles map[string]string) tgithub.PRTest {
targetNS := names.SimpleNameGenerator.RestrictLengthWithRandomSuffix("pac-e2e-ns")
_, runcnx, opts, ghcnx, err := tgithub.Setup(ctx, true, false)
assert.NilError(t, err)
Expand Down Expand Up @@ -124,7 +186,8 @@ func testGithubConcurrency(ctx context.Context, t *testing.T, maxNumberOfConcurr
prNumber, err := tgithub.PRCreate(ctx, runcnx, ghcnx, opts.Organization,
opts.Repo, targetRefName, repoinfo.GetDefaultBranch(), logmsg)
assert.NilError(t, err)
g := tgithub.PRTest{

return tgithub.PRTest{
Cnx: runcnx,
Options: opts,
Provider: ghcnx,
Expand All @@ -134,98 +197,65 @@ func testGithubConcurrency(ctx context.Context, t *testing.T, maxNumberOfConcurr
SHA: sha,
Logger: runcnx.Clients.Log,
}
defer g.TearDown(ctx, t)
}

runcnx.Clients.Log.Info("waiting to let controller process the event")
func testGithubConcurrency(ctx context.Context, t *testing.T, g tgithub.PRTest, numberOfPipelineRuns int, checkOrdering bool) {
g.Cnx.Clients.Log.Info("waiting to let controller process the event")
time.Sleep(5 * time.Second)

waitOpts := wait.Opts{
RepoName: targetNS,
Namespace: targetNS,
RepoName: g.TargetNamespace,
Namespace: g.TargetNamespace,
MinNumberStatus: 1,
PollTimeout: wait.DefaultTimeout,
TargetSHA: sha,
TargetSHA: g.SHA,
}
assert.NilError(t, wait.UntilMinPRAppeared(ctx, g.Cnx.Clients, waitOpts, numberOfPipelineRuns))

waitForPipelineRunsHasStarted(ctx, t, g, numberOfPipelineRuns)

// sort all the PR by when they have started
if checkOrdering {
prs, err := g.Cnx.Clients.Tekton.TektonV1().PipelineRuns(g.TargetNamespace).List(ctx, metav1.ListOptions{})
assert.NilError(t, err)
sort.PipelineRunSortByStartTime(prs.Items)
for i := 0; i < numberOfPipelineRuns; i++ {
prExpectedName := fmt.Sprintf("%s%d", pipelineRunFileNamePrefix, len(prs.Items)-i)
prActualName := prs.Items[i].GetName()
assert.Assert(t, strings.HasPrefix(prActualName, prExpectedName), "prActualName: %s does not start with expected prefix %s, was is ordered properly at start time", prActualName, prExpectedName)
}
}
assert.NilError(t, wait.UntilMinPRAppeared(ctx, runcnx.Clients, waitOpts, numberOfPipelineRuns))
}

func waitForPipelineRunsHasStarted(ctx context.Context, t *testing.T, g tgithub.PRTest, numberOfPipelineRuns int) {
finished := false
maxLoop := 30
for i := 0; i < maxLoop; i++ {
unsuccessful := 0
prs, err := runcnx.Clients.Tekton.TektonV1().PipelineRuns(targetNS).List(ctx, metav1.ListOptions{})
prs, err := g.Cnx.Clients.Tekton.TektonV1().PipelineRuns(g.TargetNamespace).List(ctx, metav1.ListOptions{})
assert.NilError(t, err)
for _, pr := range prs.Items {
if pr.Status.GetConditions() == nil {
unsuccessful++
continue
}
for _, condition := range pr.Status.GetConditions() {
if condition.Status == "Unknown" || condition.GetReason() == tektonv1.PipelineRunSpecStatusPending {
if condition.IsUnknown() || condition.IsFalse() || condition.GetReason() == tektonv1.PipelineRunSpecStatusPending {
unsuccessful++
continue
}
}
}
if unsuccessful == 0 {
runcnx.Clients.Log.Infof("the %d pipelineruns has successfully finished", numberOfPipelineRuns)
g.Cnx.Clients.Log.Infof("the %d pipelineruns has successfully finished", numberOfPipelineRuns)
finished = true
break
}
runcnx.Clients.Log.Infof("number of unsuccessful PR %d out of %d, waiting 10s more in the waiting loop: %d/%d", unsuccessful, numberOfPipelineRuns, i, maxLoop)
g.Cnx.Clients.Log.Infof("number of unsuccessful PR %d out of %d, waiting 10s more in the waiting loop: %d/%d", unsuccessful, numberOfPipelineRuns, i, maxLoop)
// it's high because it takes time to process on kind
time.Sleep(10 * time.Second)
}
if !finished {
t.Errorf("the %d pipelineruns has not successfully finished, some of them are still pending or it's abnormally slow to process the Q", numberOfPipelineRuns)
}

// sort all the PR by when they have started
if checkOrdering {
prs, err := runcnx.Clients.Tekton.TektonV1().PipelineRuns(targetNS).List(ctx, metav1.ListOptions{})
assert.NilError(t, err)
sort.PipelineRunSortByStartTime(prs.Items)
for i := 0; i < numberOfPipelineRuns; i++ {
prExpectedName := fmt.Sprintf("%s%d", pipelineRunFileNamePrefix, len(prs.Items)-i)
prActualName := prs.Items[i].GetName()
assert.Assert(t, strings.HasPrefix(prActualName, prExpectedName), "prActualName: %s does not start with expected prefix %s, was is ordered properly at start time", prActualName, prExpectedName)
}
}
}

func createGlobalRepo(ctx context.Context) (context.Context, string, *params.Run, error) {
runcnx := params.New()
if err := runcnx.Clients.NewClients(ctx, &runcnx.Info); err != nil {
return ctx, "", nil, err
}

ctx, err := cctx.GetControllerCtxInfo(ctx, runcnx)
if err != nil {
return ctx, "", nil, err
}

globalNS := info.GetNS(ctx)

repo := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: info.DefaultGlobalRepoName,
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: github.Int(2),
},
}

if err := repository.CreateRepo(ctx, globalNS, runcnx, repo); err != nil {
return ctx, "", nil, err
}

return ctx, globalNS, runcnx, nil
}

func cleanUpGlobalRepo(runcnx *params.Run, globalNS string) error {
if os.Getenv("TEST_NOCLEANUP") != "true" {
runcnx.Clients.Log.Infof("Cleaning up global repo %s in %s", info.DefaultGlobalRepoName, globalNS)
return runcnx.Clients.PipelineAsCode.PipelinesascodeV1alpha1().Repositories(globalNS).Delete(
context.Background(), info.DefaultGlobalRepoName, metav1.DeleteOptions{})
}
return nil
}
21 changes: 21 additions & 0 deletions test/pkg/kubestuff/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kubestuff

import (
"context"
"testing"
"time"

"github.com/openshift-pipelines/pipelines-as-code/pkg/params"
"gotest.tools/v3/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func ScaleDeployment(ctx context.Context, t *testing.T, runcnx *params.Run, replicas int32, deploymentName, namespace string) {
scale, err := runcnx.Clients.Kube.AppsV1().Deployments(namespace).GetScale(ctx, deploymentName, metav1.GetOptions{})
assert.NilError(t, err)
scale.Spec.Replicas = replicas
time.Sleep(5 * time.Second)
_, err = runcnx.Clients.Kube.AppsV1().Deployments(namespace).UpdateScale(ctx, deploymentName, scale, metav1.UpdateOptions{})
assert.NilError(t, err)
runcnx.Clients.Log.Infof("Deployment %s in namespace %s scaled to %d replicas", deploymentName, namespace, replicas)
}
51 changes: 51 additions & 0 deletions test/pkg/repository/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package repository

import (
"context"
"os"

"github.com/google/go-github/v62/github"
"github.com/openshift-pipelines/pipelines-as-code/pkg/apis/pipelinesascode/v1alpha1"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params"
"github.com/openshift-pipelines/pipelines-as-code/pkg/params/info"
"github.com/openshift-pipelines/pipelines-as-code/test/pkg/cctx"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func CreateGlobalRepo(ctx context.Context) (context.Context, string, *params.Run, error) {
runcnx := params.New()
if err := runcnx.Clients.NewClients(ctx, &runcnx.Info); err != nil {
return ctx, "", nil, err
}

ctx, err := cctx.GetControllerCtxInfo(ctx, runcnx)
if err != nil {
return ctx, "", nil, err
}

globalNS := info.GetNS(ctx)

repo := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: info.DefaultGlobalRepoName,
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: github.Int(2),
},
}

if err := CreateRepo(ctx, globalNS, runcnx, repo); err != nil {
return ctx, "", nil, err
}

return ctx, globalNS, runcnx, nil
}

func CleanUpGlobalRepo(runcnx *params.Run, globalNS string) error {
if os.Getenv("TEST_NOCLEANUP") != "true" {
runcnx.Clients.Log.Infof("Cleaning up global repo %s in %s", info.DefaultGlobalRepoName, globalNS)
return runcnx.Clients.PipelineAsCode.PipelinesascodeV1alpha1().Repositories(globalNS).Delete(
context.Background(), info.DefaultGlobalRepoName, metav1.DeleteOptions{})
}
return nil
}

0 comments on commit 5b742f4

Please sign in to comment.