Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Pending PipelineRun status (TEP-0015) #3522

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ weight: 4
- [Configuring a failure timeout](#configuring-a-failure-timeout)
- [Monitoring execution status](#monitoring-execution-status)
- [Cancelling a `PipelineRun`](#cancelling-a-pipelinerun)
- [Pending `PipelineRuns`](#pending-pipelineruns)
- [Events](events.md#pipelineruns)


Expand Down Expand Up @@ -528,6 +529,26 @@ spec:
status: "PipelineRunCancelled"
```

## Pending `PipelineRuns`

A `PipelineRun` can be created as a "pending" `PipelineRun` meaning that it will not actually be started until the pending status is cleared.

Note that a `PipelineRun` can only be marked "pending" before it has started, this setting is invalid after the `PipelineRun` has been started.

To mark a `PipelineRun` as pending, set `.spec.status` to `PipelineRunPending` when creating it:

```yaml
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: go-example-git
spec:
# […]
status: "PipelineRunPending"
```

To start the PipelineRun, clear the `.spec.status` field. Alternatively, update the value to `PipelineRunCancelled` to cancel it.

---

Except as otherwise noted, the content of this page is licensed under the
Expand Down
7 changes: 6 additions & 1 deletion internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,16 @@ func PipelineDescription(desc string) PipelineSpecOp {
}
}

// PipelineRunCancelled sets the status to cancel to the TaskRunSpec.
// PipelineRunCancelled sets the status to cancel the PipelineRunSpec.
func PipelineRunCancelled(spec *v1beta1.PipelineRunSpec) {
spec.Status = v1beta1.PipelineRunSpecStatusCancelled
}

// PipelineRunPending sets the status to pending to the PipelineRunSpec.
func PipelineRunPending(spec *v1beta1.PipelineRunSpec) {
spec.Status = v1beta1.PipelineRunSpecStatusPending
}

// PipelineDeclaredResource adds a resource declaration to the Pipeline Spec,
// with the specified name and type.
func PipelineDeclaredResource(name string, t v1beta1.PipelineResourceType) PipelineSpecOp {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (pr *PipelineRun) GetTimeout(ctx context.Context) time.Duration {
return pr.Spec.Timeout.Duration
}

// IsPending returns true if the PipelineRun's spec status is set to Pending state
func (pr *PipelineRun) IsPending() bool {
return pr.Spec.Status == PipelineRunSpecStatusPending
}

// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun
func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
Expand Down Expand Up @@ -202,6 +207,8 @@ const (
// PipelineRunSpecStatusCancelled indicates that the user wants to cancel the task,
// if not already cancelled or terminated
PipelineRunSpecStatusCancelled = "PipelineRunCancelled"

PipelineRunSpecStatusPending = "PipelineRunPending"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's worth adding a comment here similar to PipelineRunSpecStatusCancelled describing its purpose. Something like:

// PipelineRunSpecStatusPending indicates that the user wants to postpone starting a PipelineRun
// until some condition is met

)

// PipelineRef can be used to refer to a specific instance of a Pipeline.
Expand Down Expand Up @@ -243,6 +250,8 @@ const (
// This reason may be found with a corev1.ConditionFalse status, if the cancellation was processed successfully
// This reason may be found with a corev1.ConditionUnknown status, if the cancellation is being processed or failed
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonPending is the reason set when the PipelineRun is in the pending state
PipelineRunReasonPending PipelineRunReason = "PipelineRunPending"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var _ apis.Validatable = (*PipelineRun)(nil)
// Validate pipelinerun
func (pr *PipelineRun) Validate(ctx context.Context) *apis.FieldError {
errs := validate.ObjectMetadata(pr.GetObjectMeta()).ViaField("metadata")

if pr.IsPending() && pr.HasStarted() {
errs = errs.Also(apis.ErrInvalidValue("PipelineRun cannot be Pending after it is started", "spec.status"))
}

return errs.Also(pr.Spec.Validate(apis.WithinSpec(ctx)).ViaField("spec"))
}

Expand Down Expand Up @@ -78,8 +83,8 @@ func (ps *PipelineRunSpec) Validate(ctx context.Context) (errs *apis.FieldError)
}

if ps.Status != "" {
if ps.Status != PipelineRunSpecStatusCancelled {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s", ps.Status, PipelineRunSpecStatusCancelled), "status"))
if ps.Status != PipelineRunSpecStatusCancelled && ps.Status != PipelineRunSpecStatusPending {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ps.Status, PipelineRunSpecStatusCancelled, PipelineRunSpecStatusPending), "status"))
}
}

Expand Down
38 changes: 37 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipelinerun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestPipelineRun_Invalidate(t *testing.T) {
Status: "PipelineRunCancell",
},
},
want: apis.ErrInvalidValue("PipelineRunCancell should be PipelineRunCancelled", "spec.status"),
want: apis.ErrInvalidValue("PipelineRunCancell should be PipelineRunCancelled or PipelineRunPending", "spec.status"),
jbarrick-mesosphere marked this conversation as resolved.
Show resolved Hide resolved
}, {
name: "use of bundle without the feature flag set",
pr: v1beta1.PipelineRun{
Expand Down Expand Up @@ -138,6 +138,29 @@ func TestPipelineRun_Invalidate(t *testing.T) {
want: apis.ErrInvalidValue("invalid bundle reference (could not parse reference: not a valid reference)", "spec.pipelineref.bundle"),
wc: enableTektonOCIBundles(t),
},
{
name: "pipelinerun pending while running",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerunname",
},
Spec: v1beta1.PipelineRunSpec{
Status: v1beta1.PipelineRunSpecStatusPending,
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
},
Status: v1beta1.PipelineRunStatus{
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{time.Now()},
},
},
},
want: &apis.FieldError{
Message: "invalid value: PipelineRun cannot be Pending after it is started",
Paths: []string{"spec.status"},
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -221,6 +244,19 @@ func TestPipelineRun_Validate(t *testing.T) {
},
},
},
}, {
name: "pipelinerun pending",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerunname",
},
Spec: v1beta1.PipelineRunSpec{
Status: v1beta1.PipelineRunSpecStatusPending,
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
},
},
}}

for _, ts := range tests {
Expand Down
15 changes: 14 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ const (
ReasonInvalidGraph = "PipelineInvalidGraph"
// ReasonCancelled indicates that a PipelineRun was cancelled.
ReasonCancelled = "PipelineRunCancelled"
// ReasonPending indicates that a PipelineRun is pending.
ReasonPending = "PipelineRunPending"
// ReasonCouldntCancel indicates that a PipelineRun was cancelled but attempting to update
// all of the running TaskRuns as cancelled failed.
ReasonCouldntCancel = "PipelineRunCouldntCancel"
Expand Down Expand Up @@ -142,7 +144,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

if !pr.HasStarted() {
if !pr.HasStarted() && !pr.IsPending() {
pr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 {
Expand Down Expand Up @@ -325,6 +327,17 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

// When pipeline run is pending, return to avoid creating the task
if pr.IsPending() {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: ReasonPending,
Message: fmt.Sprintf("PipelineRun %q is pending", pr.Name),
})
return nil
}

pipelineMeta, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc)
if err != nil {
logger.Errorf("Failed to determine Pipeline spec to use for pipelinerun %s: %v", pr.Name, err)
Expand Down
37 changes: 37 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,43 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {
}
}

func TestReconcileOnPendingPipelineRun(t *testing.T) {
// TestReconcileOnPendingPipelineRun runs "Reconcile" on a PipelineRun that is pending.
// It verifies that reconcile is successful, the pipeline status updated and events generated.
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-pending",
tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline", tb.PipelineRunServiceAccountName("test-sa"),
tb.PipelineRunPending,
),
)}
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("hello-world", "hello-world"),
))}
ts := []*v1beta1.Task{}
trs := []*v1beta1.TaskRun{}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
jbarrick-mesosphere marked this conversation as resolved.
Show resolved Hide resolved
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string{}
reconciledRun, _ := prt.reconcileRun("foo", "test-pipeline-run-pending", wantEvents, false)

condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if !condition.IsUnknown() || condition.Reason != ReasonPending {
t.Errorf("Expected PipelineRun condition to indicate the pending failed but reason was %s", condition.Reason)
}

if reconciledRun.Status.StartTime != nil {
t.Errorf("Start time should be nil, not: %s", reconciledRun.Status.StartTime)
}
}

func TestReconcileWithTimeout(t *testing.T) {
// TestReconcileWithTimeout runs "Reconcile" on a PipelineRun that has timed out.
// It verifies that reconcile is successful, the pipeline status updated and events generated.
Expand Down
81 changes: 81 additions & 0 deletions test/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,87 @@ func getHelloWorldPipelineWithSingularTask(suffix int, namespace string) *v1beta
}
}

// TestPipelineRunPending tests that a Pending PipelineRun is not run until the pending
// status is cleared. This is separate from the TestPipelineRun suite because it has to
// transition PipelineRun states during the test, which the TestPipelineRun suite does not
// support.
func TestPipelineRunPending(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)

knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

prName := "pending-pipelinerun-test"

t.Logf("Creating Task, Pipeline, and Pending PipelineRun %s in namespace %s", prName, namespace)

if _, err := c.TaskClient.Create(ctx, &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Image: "ubuntu",
Command: []string{"/bin/bash"},
Args: []string{"-c", "echo hello, world"},
}}},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task `%s`: %s", prName, err)
}

if _, err := c.PipelineClient.Create(ctx, &v1beta1.Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "task",
TaskRef: &v1beta1.TaskRef{Name: prName},
}},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", prName, err)
}

pipelineRun, err := c.PipelineRunClient.Create(ctx, &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{Name: prName},
Status: v1beta1.PipelineRunSpecStatusPending,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create PipelineRun `%s`: %s", prName, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to be marked pending", prName, namespace)
if err := WaitForPipelineRunState(ctx, c, prName, pipelineRunTimeout, PipelineRunPending(prName), "PipelineRunPending"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to be marked pending: %s", prName, err)
}

t.Logf("Clearing pending status on PipelineRun %s", prName)

pipelineRun, err = c.PipelineRunClient.Get(ctx, prName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error getting PipelineRun %s: %s", prName, err)
}

if pipelineRun.Status.StartTime != nil {
t.Fatalf("Error start time must be nil, not: %s", pipelineRun.Status.StartTime)
}

pipelineRun.Spec.Status = ""

if _, err := c.PipelineRunClient.Update(ctx, pipelineRun, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error clearing pending status on PipelineRun %s: %s", prName, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to complete", prName, namespace)
if err := WaitForPipelineRunState(ctx, c, prName, pipelineRunTimeout, PipelineRunSucceed(prName), "PipelineRunSuccess"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to finish: %s", prName, err)
}
}

func getFanInFanOutTasks(namespace string) []*v1beta1.Task {
workspaceResource := v1beta1.TaskResource{ResourceDeclaration: v1beta1.ResourceDeclaration{
Name: "workspace",
Expand Down
26 changes: 26 additions & 0 deletions test/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"strings"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"go.opencensus.io/trace"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -278,6 +279,31 @@ func PipelineRunFailed(name string) ConditionAccessorFn {
return Failed(name)
}

// PipelineRunPending provides a poll condition function that checks if the PipelineRun
// has been marked pending by the Tekton controller.
func PipelineRunPending(name string) ConditionAccessorFn {
running := Running(name)

return func(ca apis.ConditionAccessor) (bool, error) {
c := ca.GetCondition(apis.ConditionSucceeded)
if c != nil {
if c.Status == corev1.ConditionUnknown && c.Reason == string(v1beta1.PipelineRunReasonPending) {
return true, nil
}
}
status, err := running(ca)
if status {
reason := ""
// c _should_ never be nil if we get here, but we have this check just in case.
if c != nil {
reason = c.Reason
}
return false, fmt.Errorf("status should be %s, but it is %s", v1beta1.PipelineRunReasonPending, reason)
}
return status, err
}
}

// Chain allows multiple ConditionAccessorFns to be chained together, checking the condition of each in order.
func Chain(fns ...ConditionAccessorFn) ConditionAccessorFn {
return func(ca apis.ConditionAccessor) (bool, error) {
Expand Down