Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get simple PipelineRun implementation working #128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, logging.ControllerLogKey)
defer logger.Sync()

logger.Info("Starting the Build Controller")
logger.Info("Starting the Pipeline Controller")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand Down
69 changes: 0 additions & 69 deletions pkg/apis/pipeline/v1alpha1/pipeline_test.go

This file was deleted.

22 changes: 0 additions & 22 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"github.com/knative/pkg/apis"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -133,24 +132,3 @@ type PipelineList struct {
metav1.ListMeta `json:"metadata,omitempty"`
Items []Pipeline `json:"items"`
}

// GetTask is a function that will retrieve the Task name from namespace.
type GetTask func(namespace, name string) (*Task, error)

// GetTasks retrieves all Tasks instances which the pipeline p references, getting
// instances from function g. If it is unable to retrieve an instance of a referenced
// Task, it will return an error, otherwise it returns a map from the name of the
// Task in the Pipeline to the name of the Task object itself.
func (p *Pipeline)GetTasks(g GetTask) (map[string]*Task, error) {
tasks := map[string]*Task{}
for _, pt := range p.Spec.Tasks {
t, err := g(p.Namespace, pt.TaskRef.Name)
if err != nil {
return nil, fmt.Errorf("failed to get tasks for Pipeline %q: Error getting task %q : %s",
fmt.Sprintf("%s/%s", p.Namespace, p.Name),
fmt.Sprintf("%s/%s", p.Namespace, pt.TaskRef.Name), err)
}
tasks[pt.Name] = t
}
return tasks, nil
}
49 changes: 17 additions & 32 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -71,7 +71,22 @@ type PipelineRunStatus struct {
// If there is no version, that means use latest
// +optional
ResourceVersion []PipelineResourceVersion `json:"resourceVersion,omitempty"`
Conditions []PipelineRunCondition `json:"conditions"`
Conditions duckv1alpha1.Conditions `json:"conditions"`
}

var pipelineRunCondSet = duckv1alpha1.NewBatchConditionSet()
tejal29 marked this conversation as resolved.
Show resolved Hide resolved

// GetCondition returns the Condition matching the given type.
func (pr *PipelineRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return pipelineRunCondSet.Manage(pr).GetCondition(t)
}

// SetCondition sets the condition, unsetting previous conditions with the same
// type as necessary.
func (pr *PipelineRunStatus) SetCondition(newCond *duckv1alpha1.Condition) {
if newCond != nil {
pipelineRunCondSet.Manage(pr).SetCondition(*newCond)
}
}

// +genclient
Expand Down Expand Up @@ -106,33 +121,3 @@ type PipelineRunList struct {
type PipelineTaskRun struct {
Name string `json:"name"`
}

// PipelineRunConditionType indicates the status of the execution of the PipelineRun.
type PipelineRunConditionType string

const (
// PipelineRunConditionTypeStarted indicates whether or not the PipelineRun
// has started actually executing.
PipelineRunConditionTypeStarted PipelineRunConditionType = "Started"

//PipelineRunConditionTypeCompleted indicates whether or not the PipelineRun
// has finished executing.
PipelineRunConditionTypeCompleted PipelineRunConditionType = "Completed"

// PipelineRunConditionTypeSucceeded indicates whether or not the PipelineRun
// was successful.
PipelineRunConditionTypeSucceeded PipelineRunConditionType = "Successful"
)

// PipelineRunCondition holds a Condition that the PipelineRun has entered into while being executed.
type PipelineRunCondition struct {
Type PipelineRunConditionType `json:"type"`

Status corev1.ConditionStatus `json:"status"`

LastTransitionTime metav1.Time `json:"lastTransitionTime"`
// +optional
Reason string `json:"reason,omitempty"`
// +optional
Message string `json:"message,omitempty"`
}
15 changes: 15 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ type TaskRunStatus struct {
Conditions duckv1alpha1.Conditions `json:"conditions,omitempty"`
}

var taskRunCondSet = duckv1alpha1.NewBatchConditionSet()

// GetCondition returns the Condition matching the given type.
func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return taskRunCondSet.Manage(tr).GetCondition(t)
}

// SetCondition sets the condition, unsetting previous conditions with the same
// type as necessary.
func (bs *TaskRunStatus) SetCondition(newCond *duckv1alpha1.Condition) {
if newCond != nil {
taskRunCondSet.Manage(bs).SetCondition(*newCond)
}
}

// StepRun reports the results of running a step in the Task. Each
// task has the potential to succeed or fail (based on the exit code)
// and produces logs.
Expand Down
19 changes: 1 addition & 18 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 47 additions & 18 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"fmt"
"reflect"
"time"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources"
"github.com/knative/pkg/controller"

"github.com/knative/pkg/tracker"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -61,6 +64,7 @@ type Reconciler struct {
pipelineLister listers.PipelineLister
taskRunLister listers.TaskRunLister
taskLister listers.TaskLister
tracker tracker.Interface
}

// Check that our Reconciler implements controller.Reconciler
Expand Down Expand Up @@ -91,6 +95,11 @@ func NewController(
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

r.tracker = tracker.New(impl.EnqueueKey, 30*time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 seconds?

taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.PassNew(r.tracker.OnChanged),
})
return impl
}

Expand Down Expand Up @@ -118,6 +127,17 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// Don't modify the informer's copy.
pr := original.DeepCopy()

taskRunRef := corev1.ObjectReference{
APIVersion: "build-pipeline.knative.dev/v1alpha1",
Kind: "TaskRun",
Namespace: pr.Namespace,
Name: pr.Name,
}
if err := c.tracker.Track(taskRunRef, pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
return err
}

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, pr)
Expand All @@ -134,39 +154,43 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}

func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) error {
// fetch the equivelant pipeline for this pipelinerun Run
p, err := c.pipelineLister.Pipelines(pr.Namespace).Get(pr.Spec.PipelineRef.Name)
if err != nil {
c.Logger.Errorf("%q failed to Get Pipeline: %q",
fmt.Sprintf("%s/%s", pr.Namespace, pr.Name),
fmt.Sprintf("%s/%s", pr.Namespace, pr.Spec.PipelineRef.Name))
return nil
}
pipelineTasks, err := p.GetTasks(func(namespace, name string) (*v1alpha1.Task, error) {
return c.taskLister.Tasks(namespace).Get(name)
})
if err != nil {
return fmt.Errorf("error getting Tasks for Pipeline %s, Pipeline is invalid!: %s", p.Name, err)
}
pipelineTaskName, trName, err := resources.GetNextPipelineRunTaskRun(
state, err := resources.GetPipelineState(
func(namespace, name string) (*v1alpha1.Task, error) {
return c.taskLister.Tasks(namespace).Get(name)
},
func(namespace, name string) (*v1alpha1.TaskRun, error) {
return c.taskRunLister.TaskRuns(namespace).Get(name)
},
p, pr.Name)
p, pr.Name,
)
if err != nil {
return fmt.Errorf("error getting next TaskRun to create for PipelineRun %s: %s", pr.Name, err)
if errors.IsNotFound(err) {
c.Logger.Infof("PipelineRun %s's Pipeline %s can't be Run; it contains Tasks that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, p.Name),
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err)
// The PipelineRun is Invalid so we want to stop trying to Reconcile it
return nil
}
return fmt.Errorf("error getting Tasks and/or TaskRuns for Pipeline %s, Pipeline may be invalid!: %s", p.Name, err)
}
if pipelineTaskName != "" {
_, err = c.createTaskRun(pipelineTasks[pipelineTaskName], trName, pr)
prtr := resources.GetNextTask(pr.Name, state, c.Logger)
if prtr != nil {
c.Logger.Infof("Creating a new TaskRun object %s", prtr.TaskRunName)
prtr.TaskRun, err = c.createTaskRun(prtr.Task, prtr.TaskRunName, pr)
if err != nil {
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", trName, pipelineTaskName, pr.Name, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", prtr.TaskRunName, prtr.PipelineTask.Name, pr.Name, err)
}
}

// TODO fetch the taskruns status for this pipeline run.

// TODO check status of tasks and update status of PipelineRuns

pr.Status.SetCondition(resources.GetPipelineConditionStatus(pr.Name, state, c.Logger))
c.Logger.Infof("PipelineRun %s status is being set to %s", pr.Name, pr.Status)
return nil
}

Expand All @@ -180,14 +204,19 @@ func (c *Reconciler) createTaskRun(t *v1alpha1.Task, trName string, pr *v1alpha1
*metav1.NewControllerRef(pr, groupVersionKind),
},
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: v1alpha1.TaskRef{
Name: t.Name,
},
},
}
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(t.Namespace).Create(tr)
}

func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error getting PipelineRun %s when updating status: %s", pr.Name, err)
}
if !reflect.DeepEqual(newPr.Status, pr.Status) {
newPr.Status = pr.Status
Expand Down
Loading