Skip to content

Commit

Permalink
Forcing eventlistener sink to resolve eventlistener
Browse files Browse the repository at this point in the history
This means that the sink HTTP process (and readiness probe)
will not pass until the EventListenerLister is able to resolve
the EventListener from the API server.

This is especially useful in startup cases, but can also assist
if the pod is started without permission to read the EventListener
object. In this situation, given this change, the eventlistener
pod will restart with a logged error message about lack of access to
that specific API resource.
  • Loading branch information
jmcshane committed Mar 5, 2021
1 parent 6654c3d commit 4401def
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cmd/eventlistenersink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tektoncd/triggers/pkg/client/informers/externalversions"
triggerLogging "github.com/tektoncd/triggers/pkg/logging"
"github.com/tektoncd/triggers/pkg/sink"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -114,6 +115,17 @@ func main() {
ClusterTriggerBindingLister: factory.Triggers().V1alpha1().ClusterTriggerBindings().Lister(),
TriggerTemplateLister: factory.Triggers().V1alpha1().TriggerTemplates().Lister(),
}
eventListenerBackoff := wait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 2.5,
Jitter: 0.3,
Steps: 10,
Cap: 5 * time.Second,
}
err = r.WaitForEventListener(eventListenerBackoff)
if err != nil {
panic(err)
}

// Listen and serve
logger.Infof("Listen and serve on port %s", sinkArgs.Port)
Expand Down
19 changes: 19 additions & 0 deletions pkg/sink/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package sink

import (
"flag"
"fmt"
"time"

triggersclientset "github.com/tektoncd/triggers/pkg/client/clientset/versioned"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
discoveryclient "k8s.io/client-go/discovery"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -133,3 +136,19 @@ func ConfigureClients(clusterConfig *rest.Config) (Clients, error) {
TriggersClient: triggersClient,
}, nil
}

// WaitForEventListener waits for the Kubernetes eventlistener configuration
func (r *Sink) WaitForEventListener(backoff wait.Backoff) error {

if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
if _, err := r.EventListenerLister.EventListeners(r.EventListenerNamespace).Get(r.EventListenerName); errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}); err != nil {
return fmt.Errorf("Unable to retrieve EventListener %s in Namespace %s: %s", r.EventListenerName, r.EventListenerNamespace, err)
}
return nil
}
57 changes: 57 additions & 0 deletions pkg/sink/initialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,24 @@ import (
"flag"
"strconv"
"testing"
"time"

triggersv1 "github.com/tektoncd/triggers/pkg/apis/triggers/v1alpha1"
eventlistenerinformer "github.com/tektoncd/triggers/pkg/client/injection/informers/triggers/v1alpha1/eventlistener"
"github.com/tektoncd/triggers/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
rtesting "knative.dev/pkg/reconciler/testing"
)

var testBackoff = wait.Backoff{
Duration: 50 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
Steps: 1,
Cap: 100 * time.Millisecond,
}

func Test_GetArgs(t *testing.T) {
if err := flag.Set(name, "elname"); err != nil {
t.Errorf("Error setting flag el-name: %s", err)
Expand Down Expand Up @@ -101,3 +117,44 @@ func Test_GetArgs_error(t *testing.T) {
})
}
}

func TestWaitForEventlistener(t *testing.T) {
eventListenerName := "my-eventlistener"
namespace := "my-namespace"
eventListener := &triggersv1.EventListener{
ObjectMeta: metav1.ObjectMeta{
Name: eventListenerName,
Namespace: namespace,
},
Spec: triggersv1.EventListenerSpec{},
}
ctx, _ := rtesting.SetupFakeContext(t)
test.SeedResources(t, ctx, test.Resources{EventListeners: []*triggersv1.EventListener{eventListener}})
r := Sink{
EventListenerName: eventListenerName,
EventListenerNamespace: namespace,
EventListenerLister: eventlistenerinformer.Get(ctx).Lister(),
}

r.WaitForEventListener(testBackoff)
}

func TestWaitForEventlistener_Fatal(t *testing.T) {
eventListenerName := "my-eventlistener"
namespace := "my-namespace"
ctx, _ := rtesting.SetupFakeContext(t)
test.SeedResources(t, ctx, test.Resources{EventListeners: []*triggersv1.EventListener{}})
r := Sink{
EventListenerName: eventListenerName,
EventListenerNamespace: namespace,
EventListenerLister: eventlistenerinformer.Get(ctx).Lister(),
}
// will fail
err := r.WaitForEventListener(testBackoff)
if err == nil {
t.Fatalf("expected eventlistener wait to fail, instead succeeded")
}
if err.Error() != "Unable to retrieve EventListener my-eventlistener in Namespace my-namespace: timed out waiting for the condition" {
t.Fatalf("got incorrect error message, received: %s", err)
}
}

0 comments on commit 4401def

Please sign in to comment.