Skip to content

Commit

Permalink
Feat: Add pending condition in pipeline run.
Browse files Browse the repository at this point in the history
Allows creation of Pending PipelineRuns which are not started until
the pending status is cleared.

Co-authored-by: Tianxin Dong <dongtianxin.dongtx@bytedance.com>
Co-authored-by: Justin Taylor-Barrick <jbarrick@d2iq.com>
  • Loading branch information
3 people committed Nov 18, 2020
1 parent 58ced47 commit 5986e9b
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 5 deletions.
21 changes: 21 additions & 0 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ weight: 4
- [Configuring a failure timeout](#configuring-a-failure-timeout)
- [Monitoring execution status](#monitoring-execution-status)
- [Cancelling a `PipelineRun`](#cancelling-a-pipelinerun)
- [Pending `PipelineRuns`](#pending-pipelineruns)
- [Events](events.md#pipelineruns)


Expand Down Expand Up @@ -528,6 +529,26 @@ spec:
status: "PipelineRunCancelled"
```

## Pending `PipelineRuns`

A `PipelineRun` can be created as a "pending" `PipelineRun` meaning that it will not actually be started until the pending status is cleared.

Note that a `PipelineRun` can only be marked "pending" before it has started, this setting is invalid after the `PipelineRun` has been started.

To mark a `PipelineRun` as pending, set `.spec.status` to `PipelineRunPending` when creating it:

```yaml
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: go-example-git
spec:
# […]
status: "PipelineRunPending"
```

To start the PipelineRun, clear the `.spec.status` field. Alternatively, update the value to `PipelineRunCancelled` to cancel it.

---

Except as otherwise noted, the content of this page is licensed under the
Expand Down
7 changes: 6 additions & 1 deletion internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,16 @@ func PipelineDescription(desc string) PipelineSpecOp {
}
}

// PipelineRunCancelled sets the status to cancel to the TaskRunSpec.
// PipelineRunCancelled sets the status to cancel the PipelineRunSpec.
func PipelineRunCancelled(spec *v1beta1.PipelineRunSpec) {
spec.Status = v1beta1.PipelineRunSpecStatusCancelled
}

// PipelineRunPending sets the status to pending to the PipelineRunSpec.
func PipelineRunPending(spec *v1beta1.PipelineRunSpec) {
spec.Status = v1beta1.PipelineRunSpecStatusPending
}

// PipelineDeclaredResource adds a resource declaration to the Pipeline Spec,
// with the specified name and type.
func PipelineDeclaredResource(name string, t v1beta1.PipelineResourceType) PipelineSpecOp {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (pr *PipelineRun) GetTimeout(ctx context.Context) time.Duration {
return pr.Spec.Timeout.Duration
}

// IsPending returns true if the PipelineRun's spec status is set to Pending state
func (pr *PipelineRun) IsPending() bool {
return pr.Spec.Status == PipelineRunSpecStatusPending
}

// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun
func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
Expand Down Expand Up @@ -202,6 +207,8 @@ const (
// PipelineRunSpecStatusCancelled indicates that the user wants to cancel the task,
// if not already cancelled or terminated
PipelineRunSpecStatusCancelled = "PipelineRunCancelled"

PipelineRunSpecStatusPending = "PipelineRunPending"
)

// PipelineRef can be used to refer to a specific instance of a Pipeline.
Expand Down Expand Up @@ -243,6 +250,8 @@ const (
// This reason may be found with a corev1.ConditionFalse status, if the cancellation was processed successfully
// This reason may be found with a corev1.ConditionUnknown status, if the cancellation is being processed or failed
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonPending is the reason set when the PipelineRun is in the pending state
PipelineRunReasonPending PipelineRunReason = "PipelineRunPending"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var _ apis.Validatable = (*PipelineRun)(nil)
// Validate pipelinerun
func (pr *PipelineRun) Validate(ctx context.Context) *apis.FieldError {
errs := validate.ObjectMetadata(pr.GetObjectMeta()).ViaField("metadata")

if pr.IsPending() && pr.HasStarted() {
errs = errs.Also(apis.ErrInvalidValue("PipelineRun cannot be Pending after it is started", "spec.status"))
}

return errs.Also(pr.Spec.Validate(apis.WithinSpec(ctx)).ViaField("spec"))
}

Expand Down Expand Up @@ -78,8 +83,8 @@ func (ps *PipelineRunSpec) Validate(ctx context.Context) (errs *apis.FieldError)
}

if ps.Status != "" {
if ps.Status != PipelineRunSpecStatusCancelled {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s", ps.Status, PipelineRunSpecStatusCancelled), "status"))
if ps.Status != PipelineRunSpecStatusCancelled && ps.Status != PipelineRunSpecStatusPending {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ps.Status, PipelineRunSpecStatusCancelled, PipelineRunSpecStatusPending), "status"))
}
}

Expand Down
38 changes: 37 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipelinerun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestPipelineRun_Invalidate(t *testing.T) {
Status: "PipelineRunCancell",
},
},
want: apis.ErrInvalidValue("PipelineRunCancell should be PipelineRunCancelled", "spec.status"),
want: apis.ErrInvalidValue("PipelineRunCancell should be PipelineRunCancelled or PipelineRunPending", "spec.status"),
}, {
name: "use of bundle without the feature flag set",
pr: v1beta1.PipelineRun{
Expand Down Expand Up @@ -138,6 +138,29 @@ func TestPipelineRun_Invalidate(t *testing.T) {
want: apis.ErrInvalidValue("invalid bundle reference (could not parse reference: not a valid reference)", "spec.pipelineref.bundle"),
wc: enableTektonOCIBundles(t),
},
{
name: "pipelinerun pending while running",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerunname",
},
Spec: v1beta1.PipelineRunSpec{
Status: v1beta1.PipelineRunSpecStatusPending,
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
},
Status: v1beta1.PipelineRunStatus{
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{time.Now()},
},
},
},
want: &apis.FieldError{
Message: "invalid value: PipelineRun cannot be Pending after it is started",
Paths: []string{"spec.status"},
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -221,6 +244,19 @@ func TestPipelineRun_Validate(t *testing.T) {
},
},
},
}, {
name: "pipelinerun pending",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerunname",
},
Spec: v1beta1.PipelineRunSpec{
Status: v1beta1.PipelineRunSpecStatusPending,
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
},
},
}}

for _, ts := range tests {
Expand Down
15 changes: 14 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ const (
ReasonInvalidGraph = "PipelineInvalidGraph"
// ReasonCancelled indicates that a PipelineRun was cancelled.
ReasonCancelled = "PipelineRunCancelled"
// ReasonPending indicates that a PipelineRun is pending.
ReasonPending = "PipelineRunPending"
// ReasonCouldntCancel indicates that a PipelineRun was cancelled but attempting to update
// all of the running TaskRuns as cancelled failed.
ReasonCouldntCancel = "PipelineRunCouldntCancel"
Expand Down Expand Up @@ -142,7 +144,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

if !pr.HasStarted() {
if !pr.HasStarted() && !pr.IsPending() {
pr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 {
Expand Down Expand Up @@ -325,6 +327,17 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))

// When pipeline run is pending, return to avoid creating the task
if pr.IsPending() {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: ReasonPending,
Message: fmt.Sprintf("PipelineRun %q is pending", pr.Name),
})
return nil
}

pipelineMeta, pipelineSpec, err := resources.GetPipelineData(ctx, pr, getPipelineFunc)
if err != nil {
logger.Errorf("Failed to determine Pipeline spec to use for pipelinerun %s: %v", pr.Name, err)
Expand Down
37 changes: 37 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,43 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {
}
}

func TestReconcileOnPendingPipelineRun(t *testing.T) {
// TestReconcileOnPendingPipelineRun runs "Reconcile" on a PipelineRun that is pending.
// It verifies that reconcile is successful, the pipeline status updated and events generated.
prs := []*v1beta1.PipelineRun{tb.PipelineRun("test-pipeline-run-pending",
tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline", tb.PipelineRunServiceAccountName("test-sa"),
tb.PipelineRunPending,
),
)}
ps := []*v1beta1.Pipeline{tb.Pipeline("test-pipeline", tb.PipelineNamespace("foo"), tb.PipelineSpec(
tb.PipelineTask("hello-world", "hello-world"),
))}
ts := []*v1beta1.Task{}
trs := []*v1beta1.TaskRun{}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := NewPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string{}
reconciledRun, _ := prt.reconcileRun("foo", "test-pipeline-run-pending", wantEvents, false)

condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded)
if !condition.IsUnknown() || condition.Reason != ReasonPending {
t.Errorf("Expected PipelineRun condition to indicate the pending failed but reason was %s", condition.Reason)
}

if reconciledRun.Status.StartTime != nil {
t.Errorf("Start time should be nil, not: %s", reconciledRun.Status.StartTime)
}
}

func TestReconcileWithTimeout(t *testing.T) {
// TestReconcileWithTimeout runs "Reconcile" on a PipelineRun that has timed out.
// It verifies that reconcile is successful, the pipeline status updated and events generated.
Expand Down
81 changes: 81 additions & 0 deletions test/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,87 @@ func getHelloWorldPipelineWithSingularTask(suffix int, namespace string) *v1beta
}
}

// TestPipelineRunPending tests that a Pending PipelineRun is not run until the pending
// status is cleared. This is separate from the TestPipelineRun suite because it has to
// transition PipelineRun states during the test, which the TestPipelineRun suite does not
// support.
func TestPipelineRunPending(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)

knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

prName := "pending-pipelinerun-test"

t.Logf("Creating Task, Pipeline, and Pending PipelineRun %s in namespace %s", prName, namespace)

if _, err := c.TaskClient.Create(ctx, &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Image: "ubuntu",
Command: []string{"/bin/bash"},
Args: []string{"-c", "echo hello, world"},
}}},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task `%s`: %s", prName, err)
}

if _, err := c.PipelineClient.Create(ctx, &v1beta1.Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.PipelineSpec{
Tasks: []v1beta1.PipelineTask{{
Name: "task",
TaskRef: &v1beta1.TaskRef{Name: prName},
}},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", prName, err)
}

pipelineRun, err := c.PipelineRunClient.Create(ctx, &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: prName, Namespace: namespace},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{Name: prName},
Status: v1beta1.PipelineRunSpecStatusPending,
},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create PipelineRun `%s`: %s", prName, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to be marked pending", prName, namespace)
if err := WaitForPipelineRunState(ctx, c, prName, pipelineRunTimeout, PipelineRunPending(prName), "PipelineRunPending"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to be marked pending: %s", prName, err)
}

t.Logf("Clearing pending status on PipelineRun %s", prName)

pipelineRun, err = c.PipelineRunClient.Get(ctx, prName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error getting PipelineRun %s: %s", prName, err)
}

if pipelineRun.Status.StartTime != nil {
t.Fatalf("Error start time must be nil, not: %s", pipelineRun.Status.StartTime)
}

pipelineRun.Spec.Status = ""

if _, err := c.PipelineRunClient.Update(ctx, pipelineRun, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error clearing pending status on PipelineRun %s: %s", prName, err)
}

t.Logf("Waiting for PipelineRun %s in namespace %s to complete", prName, namespace)
if err := WaitForPipelineRunState(ctx, c, prName, pipelineRunTimeout, PipelineRunSucceed(prName), "PipelineRunSuccess"); err != nil {
t.Fatalf("Error waiting for PipelineRun %s to finish: %s", prName, err)
}
}

func getFanInFanOutTasks(namespace string) []*v1beta1.Task {
workspaceResource := v1beta1.TaskResource{ResourceDeclaration: v1beta1.ResourceDeclaration{
Name: "workspace",
Expand Down
26 changes: 26 additions & 0 deletions test/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"strings"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"go.opencensus.io/trace"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -278,6 +279,31 @@ func PipelineRunFailed(name string) ConditionAccessorFn {
return Failed(name)
}

// PipelineRunPending provides a poll condition function that checks if the PipelineRun
// has been marked pending by the Tekton controller.
func PipelineRunPending(name string) ConditionAccessorFn {
running := Running(name)

return func(ca apis.ConditionAccessor) (bool, error) {
c := ca.GetCondition(apis.ConditionSucceeded)
if c != nil {
if c.Status == corev1.ConditionUnknown && c.Reason == string(v1beta1.PipelineRunReasonPending) {
return true, nil
}
}
status, err := running(ca)
if status {
reason := ""
// c _should_ never be nil if we get here, but we have this check just in case.
if c != nil {
reason = c.Reason
}
return false, fmt.Errorf("status should be %s, but it is %s", v1beta1.PipelineRunReasonPending, reason)
}
return status, err
}
}

// Chain allows multiple ConditionAccessorFns to be chained together, checking the condition of each in order.
func Chain(fns ...ConditionAccessorFn) ConditionAccessorFn {
return func(ca apis.ConditionAccessor) (bool, error) {
Expand Down

0 comments on commit 5986e9b

Please sign in to comment.