Skip to content

Commit

Permalink
Add taskRef remote resolution support
Browse files Browse the repository at this point in the history
Followup to tektoncd#4596, needed for tektoncd#4710.

remote resolution, both in explicitly created `TaskRun`s and in `PipelineRun`s' `PipelineTask`s,
from public git repositories using tektoncd/resolution.

This is still in alpha.

Signed-off-by: Andrew Bayer <andrew.bayer@gmail.com>
  • Loading branch information
abayer committed May 11, 2022
1 parent 1c8447d commit 0abd1b0
Show file tree
Hide file tree
Showing 14 changed files with 552 additions and 88 deletions.
4 changes: 1 addition & 3 deletions docs/taskruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ cli *(coming soon)*.

**([alpha only](https://github.com/tektoncd/pipeline/blob/main/docs/install.md#alpha-features))**

**Warning: This feature is still in very early stage of development and is not yet functional. Do not use it.**

A `taskRef` field may specify a Task in a remote location such as git.
Support for specific types of remote will depend on the Resolvers your
cluster's operator has installed. The below example demonstrates
Expand All @@ -179,7 +177,7 @@ spec:
taskRef:
resolver: git
resource:
- name: repo
- name: url
value: https://github.com/tektoncd/catalog.git
- name: commit
value: abc123
Expand Down
21 changes: 11 additions & 10 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (pt PipelineTask) validateBundle() (errs *apis.FieldError) {

// validateTask validates a pipeline task or a final task for taskRef and taskSpec
func (pt PipelineTask) validateTask(ctx context.Context) (errs *apis.FieldError) {
cfg := config.FromContextOrDefaults(ctx)
// Validate TaskSpec if it's present
if pt.TaskSpec != nil {
errs = errs.Also(pt.TaskSpec.Validate(ctx).ViaField("taskSpec"))
Expand All @@ -287,21 +288,21 @@ func (pt PipelineTask) validateTask(ctx context.Context) (errs *apis.FieldError)
if errSlice := validation.IsQualifiedName(pt.TaskRef.Name); len(errSlice) != 0 {
errs = errs.Also(apis.ErrInvalidValue(strings.Join(errSlice, ","), "name"))
}
} else {
} else if pt.TaskRef.Resolver == "" {
errs = errs.Also(apis.ErrInvalidValue("taskRef must specify name", "taskRef.name"))
}
// fail if bundle is present when EnableTektonOCIBundles feature flag is off (as it won't be allowed nor used)
if pt.TaskRef.Bundle != "" {
if !cfg.FeatureFlags.EnableTektonOCIBundles && pt.TaskRef.Bundle != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.bundle"))
}
// fail if resolver or resource are present regardless
// of enabled api fields because remote resolution is
// not implemented yet for PipelineTasks.
if pt.TaskRef.Resolver != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resolver"))
}
if len(pt.TaskRef.Resource) > 0 {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resource"))
if cfg.FeatureFlags.EnableAPIFields != config.AlphaAPIFields {
// fail if resolver or resource are present when enable-api-fields is false.
if pt.TaskRef.Resolver != "" {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resolver"))
}
if len(pt.TaskRef.Resource) > 0 {
errs = errs.Also(apis.ErrDisallowedFields("taskref.resource"))
}
}
}
return errs
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ func TestPipelineTask_ValidateRegularTask_Failure(t *testing.T) {
},
expectedError: *apis.ErrDisallowedFields("taskref.bundle"),
}, {
name: "pipeline task - use of resolver",
name: "pipeline task - use of resolver without the feature flag set",
task: PipelineTask{
Name: "foo",
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resolver: "bar"}},
},
expectedError: *apis.ErrDisallowedFields("taskref.resolver"),
}, {
name: "pipeline task - use of resource",
name: "pipeline task - use of resource without the feature flag set",
task: PipelineTask{
Name: "foo",
TaskRef: &TaskRef{Name: "boo", ResolverRef: ResolverRef{Resource: []ResolverParam{{}}}},
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ const (
TaskRunReasonCancelled TaskRunReason = "TaskRunCancelled"
// TaskRunReasonTimedOut is the reason set when the Taskrun has timed out
TaskRunReasonTimedOut TaskRunReason = "TaskRunTimeout"
// TaskRunReasonResolvingTaskRef indicates that the TaskRun is waiting for
// its taskRef to be asynchronously resolved.
TaskRunReasonResolvingTaskRef = "ResolvingTaskRef"
)

func (t TaskRunReason) String() string {
Expand Down
25 changes: 23 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,19 @@ func (c *Reconciler) resolvePipelineState(
pst := resources.PipelineRunState{}
// Resolve each task individually because they each could have a different reference context (remote or local).
for _, task := range tasks {
fn, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, task.TaskRef, pr.Namespace, pr.Spec.ServiceAccountName)
// Create a dummy TaskRun for remote resolution.
// TODO(abayer): Not sure if this is the right way to deal with this - resolution.NewResolver expects a kmeta.OwnerRefable, but just uses its name and namespace...
trName := resources.GetTaskRunName(pr.Status.TaskRuns, pr.Status.ChildReferences, task.Name, pr.Name)
trForRemoteResolution := &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: trName,
Namespace: pr.Namespace,
},
Spec: v1beta1.TaskRunSpec{
TaskRef: task.TaskRef,
},
}
fn, err := tresources.GetTaskFunc(ctx, c.KubeClientSet, c.PipelineClientSet, c.resolutionRequester, trForRemoteResolution, pr.Namespace, pr.Spec.ServiceAccountName)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonCouldntGetTask, "Pipeline %s/%s can't be Run; task %s could not be fetched: %s",
Expand All @@ -303,6 +315,9 @@ func (c *Reconciler) resolvePipelineState(
if tresources.IsGetTaskErrTransient(err) {
return nil, err
}
if errors.Is(err, remote.ErrorRequestInProgress) {
return nil, err
}
switch err := err.(type) {
case *resources.TaskNotFoundError:
pr.Status.MarkFailed(ReasonCouldntGetTask,
Expand Down Expand Up @@ -467,8 +482,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta, pr, providedResources)
if err != nil {
switch {
case errors.Is(err, remote.ErrorRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1beta1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}

// Build PipelineRunFacts with a list of resolved pipeline tasks,
Expand Down
78 changes: 78 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7128,6 +7128,84 @@ spec:
checkPipelineRunConditionStatusAndReason(t, updatedPipelineRun, corev1.ConditionUnknown, v1beta1.PipelineRunReasonRunning.String())
}

// TestReconcileWithTaskResolver checks that a PipelineRun with a populated Resolver
// field for a Task creates a ResolutionRequest object for that Resolver's type, and
// that when the request is successfully resolved the PipelineRun begins running.
func TestReconcileWithTaskResolver(t *testing.T) {
resolverName := "foobar"
pr := parse.MustParsePipelineRun(t, `
metadata:
name: pr
namespace: default
spec:
pipelineSpec:
tasks:
- name: some-task
taskRef:
resolver: foobar
serviceAccountName: default
`)

cms := []*corev1.ConfigMap{withEnabledAlphaAPIFields(newFeatureFlagsConfigMap())}

d := test.Data{
ConfigMaps: cms,
PipelineRuns: []*v1beta1.PipelineRun{pr},
ServiceAccounts: []*corev1.ServiceAccount{{
ObjectMeta: metav1.ObjectMeta{Name: pr.Spec.ServiceAccountName, Namespace: "foo"},
}},
}

prt := newPipelineRunTest(d, t)
defer prt.Cancel()

wantEvents := []string(nil)
pipelinerun, _ := prt.reconcileRun(pr.Namespace, pr.Name, wantEvents, false)
checkPipelineRunConditionStatusAndReason(t, pipelinerun, corev1.ConditionUnknown, v1beta1.TaskRunReasonResolvingTaskRef)

client := prt.TestAssets.Clients.ResolutionRequests.ResolutionV1alpha1().ResolutionRequests("default")
resolutionrequests, err := client.List(prt.TestAssets.Ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error listing resource requests: %v", err)
}
numResolutionRequests := len(resolutionrequests.Items)
if numResolutionRequests != 1 {
t.Fatalf("expected exactly 1 resource request but found %d", numResolutionRequests)
}

resreq := &resolutionrequests.Items[0]
resolutionRequestType := resreq.ObjectMeta.Labels["resolution.tekton.dev/type"]
if resolutionRequestType != resolverName {
t.Fatalf("expected resource request type %q but saw %q", resolutionRequestType, resolverName)
}

taskBytes := []byte(`
kind: Task
apiVersion: tekton.dev/v1beta1
metadata:
name: foo
spec:
steps:
- name: step1
image: ubuntu
script: |
echo "hello world!"
`)

resreq.Status.ResolutionRequestStatusFields.Data = base64.StdEncoding.Strict().EncodeToString(taskBytes)
resreq.Status.MarkSucceeded()
resreq, err = client.UpdateStatus(prt.TestAssets.Ctx, resreq, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error updating resource request with resolved pipeline data: %v", err)
}

// Check that the resolved pipeline was recognized by the
// PipelineRun reconciler and that the PipelineRun has now
// started executing.
updatedPipelineRun, _ := prt.reconcileRun("default", "pr", nil, false)
checkPipelineRunConditionStatusAndReason(t, updatedPipelineRun, corev1.ConditionUnknown, v1beta1.PipelineRunReasonRunning.String())
}

func getTaskRunWithTaskSpec(tr, pr, p, t string, labels, annotations map[string]string) *v1beta1.TaskRun {
om := taskRunObjectMeta(tr, "foo", pr, p, t, false)
for k, v := range labels {
Expand Down
23 changes: 14 additions & 9 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package resources

import (
"context"
"errors"
"fmt"
"strconv"

"knative.dev/pkg/kmeta"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/list"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/tektoncd/pipeline/pkg/remote"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"
)

const (
Expand Down Expand Up @@ -482,7 +483,7 @@ func ResolvePipelineRunTask(
if rprt.IsCustomTask() {
rprt.RunName = getRunName(pipelineRun.Status.Runs, pipelineRun.Status.ChildReferences, task.Name, pipelineRun.Name)
run, err := getRun(rprt.RunName)
if err != nil && !errors.IsNotFound(err) {
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving Run %s: %w", rprt.RunName, err)
}
rprt.Run = run
Expand All @@ -500,7 +501,7 @@ func ResolvePipelineRunTask(

taskRun, err := getTaskRun(rprt.TaskRunName)
if err != nil {
if !errors.IsNotFound(err) {
if !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving TaskRun %s: %w", rprt.TaskRunName, err)
}
}
Expand All @@ -515,14 +516,18 @@ func ResolvePipelineRunTask(
taskName = task.TaskRef.Name
} else {
t, err = getTask(ctx, task.TaskRef.Name)
if err != nil {
switch {
case errors.Is(err, remote.ErrorRequestInProgress):
return nil, err
case err != nil:
return nil, &TaskNotFoundError{
Name: task.TaskRef.Name,
Msg: err.Error(),
}
default:
spec = t.TaskSpec()
taskName = t.TaskMetadata().Name
}
spec = t.TaskSpec()
taskName = t.TaskMetadata().Name
}
kind = task.TaskRef.Kind
} else {
Expand Down Expand Up @@ -623,7 +628,7 @@ func resolveConditionChecks(pt *v1beta1.PipelineTask, taskRunStatus map[string]*
// TODO(#3133): Also handle Custom Task Runs (getRun here)
cctr, err := getTaskRun(conditionCheckName)
if err != nil {
if !errors.IsNotFound(err) {
if !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving ConditionCheck %s for taskRun name %s : %w", conditionCheckName, taskRunName, err)
}
}
Expand Down
27 changes: 16 additions & 11 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/taskrunmetrics"
resolutionclient "github.com/tektoncd/resolution/pkg/client/injection/client"
resolutioninformer "github.com/tektoncd/resolution/pkg/client/injection/informers/resolution/v1alpha1/resolutionrequest"
resolution "github.com/tektoncd/resolution/pkg/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
Expand All @@ -50,6 +53,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
podInformer := filteredpodinformer.Get(ctx, v1beta1.ManagedByLabelKey)
resourceInformer := resourceinformer.Get(ctx)
limitrangeInformer := limitrangeinformer.Get(ctx)
resolutionInformer := resolutioninformer.Get(ctx)
configStore := config.NewStore(logger.Named("config-store"), taskrunmetrics.MetricsOnStore(logger))
configStore.WatchConfigs(cmw)

Expand All @@ -59,17 +63,18 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
}

c := &Reconciler{
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: opts.Images,
Clock: clock,
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
limitrangeLister: limitrangeInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
KubeClientSet: kubeclientset,
PipelineClientSet: pipelineclientset,
Images: opts.Images,
Clock: clock,
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
limitrangeLister: limitrangeInformer.Lister(),
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
entrypointCache: entrypointCache,
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()),
}
impl := taskrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
Loading

0 comments on commit 0abd1b0

Please sign in to comment.