Skip to content

Commit

Permalink
sink: Sync lister informers before serving traffic
Browse files Browse the repository at this point in the history
Previously, we had a race where the EL would start serving traffic
before the lister caches were synced. This leads to the intermittent
resolution issues described in tektoncd#896. tektoncd#977 was an attempt to fix this for
the EventListener resource. This commit fixes it for all resource types
by first registering all the listers, and then syncing the cache before
serving traffic.

Tested by modifying the e2e test to run the intermittently failing test
10 times. Without this fix, it fails while with it it does not (See tektoncd#1012).

Fixes tektoncd#896

Signed-off-by: Dibyo Mukherjee <dibyo@google.com>
  • Loading branch information
dibyom committed Mar 19, 2021
1 parent 02c3896 commit 126e1ef
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 110 deletions.
50 changes: 22 additions & 28 deletions cmd/eventlistenersink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@ limitations under the License.
package main

import (
"context"
"fmt"
"log"
"net/http"
"time"

"go.uber.org/zap"

dynamicClientset "github.com/tektoncd/triggers/pkg/client/dynamic/clientset"
"github.com/tektoncd/triggers/pkg/client/dynamic/clientset/tekton"
"github.com/tektoncd/triggers/pkg/client/informers/externalversions"
triggerLogging "github.com/tektoncd/triggers/pkg/logging"
"github.com/tektoncd/triggers/pkg/sink"
"k8s.io/apimachinery/pkg/util/wait"
"go.uber.org/zap"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -86,47 +83,44 @@ func main() {
logger.Fatal(err)
}

// Create a sharedInformer factory so that we can cache API server calls
factory := externalversions.NewSharedInformerFactoryWithOptions(sinkClients.TriggersClient,
30*time.Second, externalversions.WithNamespace(sinkArgs.ElNamespace))
if sinkArgs.IsMultiNS {
factory = externalversions.NewSharedInformerFactory(sinkClients.TriggersClient,
30*time.Second)
}

go func(ctx context.Context) {
factory.Start(ctx.Done())
<-ctx.Done()
}(ctx)

// Create EventListener Sink
r := sink.Sink{
KubeClientSet: kubeClient,
DiscoveryClient: sinkClients.DiscoveryClient,
DynamicClient: dynamicCS,
TriggersClient: sinkClients.TriggersClient,
HTTPClient: http.DefaultClient,
EventListenerName: sinkArgs.ElName,
EventListenerNamespace: sinkArgs.ElNamespace,
Logger: logger,
Auth: sink.DefaultAuthOverride{},
KubeClientSet: kubeClient,
DiscoveryClient: sinkClients.DiscoveryClient,
DynamicClient: dynamicCS,
TriggersClient: sinkClients.TriggersClient,
HTTPClient: http.DefaultClient,
EventListenerName: sinkArgs.ElName,
EventListenerNamespace: sinkArgs.ElNamespace,
Logger: logger,
Auth: sink.DefaultAuthOverride{},
// Register all the listers we'll need
EventListenerLister: factory.Triggers().V1alpha1().EventListeners().Lister(),
TriggerLister: factory.Triggers().V1alpha1().Triggers().Lister(),
TriggerBindingLister: factory.Triggers().V1alpha1().TriggerBindings().Lister(),
ClusterTriggerBindingLister: factory.Triggers().V1alpha1().ClusterTriggerBindings().Lister(),
TriggerTemplateLister: factory.Triggers().V1alpha1().TriggerTemplates().Lister(),
ClusterInterceptorLister: factory.Triggers().V1alpha1().ClusterInterceptors().Lister(),
}
eventListenerBackoff := wait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 2.5,
Jitter: 0.3,
Steps: 10,
Cap: 5 * time.Second,
}
err = r.WaitForEventListener(eventListenerBackoff)
if err != nil {
logger.Fatal(err)

// Start and sync the informers before we start taking traffic
// TODO: maybe we should have a timeout here?
factory.Start(ctx.Done())
res := factory.WaitForCacheSync(ctx.Done())
for r, hasSynced := range res {
if !hasSynced {
logger.Fatalf("failed to sync informer for: %s", r)
}
}
logger.Infof("Synced informers. Starting EventListener")

// Listen and serve
logger.Infof("Listen and serve on port %s", sinkArgs.Port)
Expand Down
19 changes: 0 additions & 19 deletions pkg/sink/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ package sink

import (
"flag"
"fmt"
"time"

triggersclientset "github.com/tektoncd/triggers/pkg/client/clientset/versioned"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
discoveryclient "k8s.io/client-go/discovery"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -136,19 +133,3 @@ func ConfigureClients(clusterConfig *rest.Config) (Clients, error) {
TriggersClient: triggersClient,
}, nil
}

// WaitForEventListener waits for the Kubernetes eventlistener configuration
func (r *Sink) WaitForEventListener(backoff wait.Backoff) error {

if err := wait.ExponentialBackoff(backoff, func() (bool, error) {
if _, err := r.EventListenerLister.EventListeners(r.EventListenerNamespace).Get(r.EventListenerName); errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}); err != nil {
return fmt.Errorf("Unable to retrieve EventListener %s in Namespace %s: %s", r.EventListenerName, r.EventListenerNamespace, err)
}
return nil
}
60 changes: 0 additions & 60 deletions pkg/sink/initialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,8 @@ import (
"flag"
"strconv"
"testing"
"time"

triggersv1 "github.com/tektoncd/triggers/pkg/apis/triggers/v1alpha1"
eventlistenerinformer "github.com/tektoncd/triggers/pkg/client/injection/informers/triggers/v1alpha1/eventlistener"
"github.com/tektoncd/triggers/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
rtesting "knative.dev/pkg/reconciler/testing"
)

var testBackoff = wait.Backoff{
Duration: 50 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
Steps: 1,
Cap: 100 * time.Millisecond,
}

func Test_GetArgs(t *testing.T) {
if err := flag.Set(name, "elname"); err != nil {
t.Errorf("Error setting flag el-name: %s", err)
Expand Down Expand Up @@ -117,47 +101,3 @@ func Test_GetArgs_error(t *testing.T) {
})
}
}

func TestWaitForEventlistener(t *testing.T) {
eventListenerName := "my-eventlistener"
namespace := "my-namespace"
eventListener := &triggersv1.EventListener{
ObjectMeta: metav1.ObjectMeta{
Name: eventListenerName,
Namespace: namespace,
},
Spec: triggersv1.EventListenerSpec{},
}
ctx, _ := rtesting.SetupFakeContext(t)
test.SeedResources(t, ctx, test.Resources{EventListeners: []*triggersv1.EventListener{eventListener}})
r := Sink{
EventListenerName: eventListenerName,
EventListenerNamespace: namespace,
EventListenerLister: eventlistenerinformer.Get(ctx).Lister(),
}

err := r.WaitForEventListener(testBackoff)
if err != nil {
t.Fatalf("Expected no error, received %s", err)
}
}

func TestWaitForEventlistener_Fatal(t *testing.T) {
eventListenerName := "my-eventlistener"
namespace := "my-namespace"
ctx, _ := rtesting.SetupFakeContext(t)
test.SeedResources(t, ctx, test.Resources{EventListeners: []*triggersv1.EventListener{}})
r := Sink{
EventListenerName: eventListenerName,
EventListenerNamespace: namespace,
EventListenerLister: eventlistenerinformer.Get(ctx).Lister(),
}
// will fail
err := r.WaitForEventListener(testBackoff)
if err == nil {
t.Fatalf("expected eventlistener wait to fail, instead succeeded")
}
if err.Error() != "Unable to retrieve EventListener my-eventlistener in Namespace my-namespace: timed out waiting for the condition" {
t.Fatalf("got incorrect error message, received: %s", err)
}
}
2 changes: 1 addition & 1 deletion test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function install_triggers_crd() {
ko apply -f config/ || fail_test "Tekton Triggers installation failed"

# Wait for the Interceptors CRD to be available before adding the core-interceptors
kubectl wait --for=condition=Established --timeout=30s crds/interceptors.triggers.tekton.dev
kubectl wait --for=condition=Established --timeout=30s crds/clusterinterceptors.triggers.tekton.dev
ko apply -f config/interceptors || fail_test "Core interceptors installation failed"

# Make sure that eveything is cleaned up in the current namespace.
Expand Down
2 changes: 1 addition & 1 deletion test/eventlistener_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func createServiceAccount(t *testing.T, c *clients, namespace, name string) {
ObjectMeta: metav1.ObjectMeta{Name: "sa-role"},
Rules: []rbacv1.PolicyRule{{
APIGroups: []string{triggersv1.GroupName},
Resources: []string{"eventlisteners", "triggerbindings", "triggertemplates", "triggers"},
Resources: []string{"clustertriggerbindings", "eventlisteners", "triggerbindings", "triggertemplates", "triggers"},
Verbs: []string{"get", "list", "watch"},
}, {
APIGroups: []string{""},
Expand Down
1 change: 0 additions & 1 deletion test/eventlistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func impersonateRBAC(t *testing.T, sa, namespace string, kubeClient kubernetes.I

func TestEventListenerCreate(t *testing.T) {
c, namespace := setup(t)
t.Parallel()
defer cleanup(t, c, namespace, "my-eventlistener")
knativetest.CleanupOnInterrupt(func() { cleanup(t, c, namespace, "my-eventlistener") }, t.Logf)

Expand Down

0 comments on commit 126e1ef

Please sign in to comment.