Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use knative "adapter" to simplify event listener sink #1207

Merged
merged 4 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
181 changes: 17 additions & 164 deletions cmd/eventlistenersink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,200 +18,53 @@ package main

import (
"context"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"

"github.com/tektoncd/triggers/pkg/adapter"
dynamicClientset "github.com/tektoncd/triggers/pkg/client/dynamic/clientset"
"github.com/tektoncd/triggers/pkg/client/dynamic/clientset/tekton"
"github.com/tektoncd/triggers/pkg/client/informers/externalversions"
triggerLogging "github.com/tektoncd/triggers/pkg/logging"
"github.com/tektoncd/triggers/pkg/sink"
"github.com/tektoncd/triggers/pkg/system"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cminformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/controller"
evadapter "knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/signals"
)

const (
// EventListenerLogKey is the name of the logger for the eventlistener cmd
EventListenerLogKey = "eventlistener"
// ConfigName is the name of the ConfigMap that the logging config will be stored in
ConfigName = "config-logging-triggers"
)

var (
// CacheSyncTimeout is the amount of the time we will wait for the informer cache to sync
// before timing out
cacheSyncTimeout = 1 * time.Minute
)

func main() {
// set up signals so we handle the first shutdown signal gracefully
ctx := signals.NewContext()

clusterConfig, err := rest.InClusterConfig()
dibyom marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Fatalf("Failed to get in cluster config: %v", err)
}
ctx, startInformers := injection.EnableInjectionOrDie(ctx, clusterConfig)
kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
log.Fatalf("Failed to get the Kubernetes client set: %v", err)
}

dynamicClient, err := dynamic.NewForConfig(clusterConfig)
if err != nil {
log.Fatalf("Failed to get the dynamic client: %v", err)
}
dynamicCS := dynamicClientset.New(tekton.WithClient(dynamicClient))
configMapWatcher := cminformer.NewInformedWatcher(kubeClient, system.GetNamespace())

logger := triggerLogging.ConfigureLogging(EventListenerLogKey, ConfigName, ctx.Done(), kubeClient, configMapWatcher)
ctx = logging.WithLogger(ctx, logger)

profilingHandler := profiling.NewHandler(logger, false)
dibyom marked this conversation as resolved.
Show resolved Hide resolved
profilingServer := profiling.NewServer(profilingHandler)
metrics.MemStatsOrDie(ctx)

exp := metrics.ExporterOptions{
Component: strings.ReplaceAll(EventListenerLogKey, "-", "_"),
ConfigMap: nil,
Secrets: sharedmain.SecretFetcher(ctx),
}
cmData, err := metrics.UpdateExporterFromConfigMapWithOpts(ctx, exp, logger)
if err != nil {
logger.Fatalw("Error in update exporter from ConfigMap ", zap.Error(err))
}
if _, err := kubeClient.CoreV1().ConfigMaps(system.GetNamespace()).Get(ctx, metrics.ConfigMapName(),
metav1.GetOptions{}); err == nil {
configMapWatcher.Watch(metrics.ConfigMapName(),
cmData,
profilingHandler.UpdateFromConfigMap)
} else if !apierrors.IsNotFound(err) {
logger.Fatalw("Error reading ConfigMap "+metrics.ConfigMapName(), zap.Error(err))
}
cfg := injection.ParseAndGetRESTConfigOrDie()

logger.Info("Starting configuration manager...")
if err := configMapWatcher.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
defer func() {
err := logger.Sync()
if err != nil {
logger.Fatalf("Failed to sync the logger", zap.Error(err))
}
}()
dc := dynamic.NewForConfigOrDie(cfg)
dc = dynamicClientset.New(tekton.WithClient(dc))
ctx = context.WithValue(ctx, dynamicclient.Key{}, dc)

logger.Info("EventListener pod started")
// Set up ctx with the set of things based on the
// dynamic client we've set up above.
ctx = injection.Dynamic.SetupDynamic(ctx)

sinkArgs, err := sink.GetArgs()
if err != nil {
logger.Fatal(err)
log.Fatal(err.Error())
}

sinkClients, err := sink.ConfigureClients(clusterConfig)
sinkClients, err := sink.ConfigureClients(cfg)
if err != nil {
logger.Fatal(err)
}

// Create a sharedInformer factory so that we can cache API server calls
factory := externalversions.NewSharedInformerFactoryWithOptions(sinkClients.TriggersClient,
controller.DefaultResyncPeriod, externalversions.WithNamespace(sinkArgs.ElNamespace))
if sinkArgs.IsMultiNS {
factory = externalversions.NewSharedInformerFactory(sinkClients.TriggersClient,
controller.DefaultResyncPeriod)
log.Fatal(err.Error())
}

recorder, err := sink.NewRecorder()
if err != nil {
logger.Fatal(err)
}
// Create EventListener Sink
r := sink.Sink{
KubeClientSet: kubeClient,
DiscoveryClient: sinkClients.DiscoveryClient,
DynamicClient: dynamicCS,
TriggersClient: sinkClients.TriggersClient,
HTTPClient: http.DefaultClient,
EventListenerName: sinkArgs.ElName,
EventListenerNamespace: sinkArgs.ElNamespace,
PayloadValidation: sinkArgs.PayloadValidation,
Logger: logger,
Recorder: recorder,
Auth: sink.DefaultAuthOverride{},
WGProcessTriggers: &sync.WaitGroup{},
// Register all the listers we'll need
EventListenerLister: factory.Triggers().V1beta1().EventListeners().Lister(),
TriggerLister: factory.Triggers().V1beta1().Triggers().Lister(),
TriggerBindingLister: factory.Triggers().V1beta1().TriggerBindings().Lister(),
ClusterTriggerBindingLister: factory.Triggers().V1beta1().ClusterTriggerBindings().Lister(),
TriggerTemplateLister: factory.Triggers().V1beta1().TriggerTemplates().Lister(),
ClusterInterceptorLister: factory.Triggers().V1alpha1().ClusterInterceptors().Lister(),
log.Fatal(err.Error())
}

startInformers()
// Start and sync the informers before we start taking traffic
withTimeout, cancel := context.WithTimeout(ctx, cacheSyncTimeout)
defer cancel()
factory.Start(ctx.Done())
res := factory.WaitForCacheSync(withTimeout.Done())
for r, hasSynced := range res {
if !hasSynced {
logger.Fatalf("failed to sync informer for: %s", r)
}
if !sinkArgs.IsMultiNS {
ctx = injection.WithNamespaceScope(ctx, sinkArgs.ElNamespace)
mattmoor marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Infof("Synced informers. Starting EventListener")

// Listen and serve
logger.Infof("Listen and serve on port %s", sinkArgs.Port)
mux := http.NewServeMux()
eventHandler := http.HandlerFunc(r.HandleEvent)
metricsRecorder := &sink.MetricsHandler{Handler: r.IsValidPayload(eventHandler)}

mux.HandleFunc("/", http.HandlerFunc(metricsRecorder.Intercept(r.NewMetricsRecorderInterceptor())))

// For handling Liveness Probe
// TODO(dibyom): Livness, metrics etc. should be on a separate port
mux.HandleFunc("/live", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
fmt.Fprint(w, "ok")
})

srv := &http.Server{
Addr: fmt.Sprintf(":%s", sinkArgs.Port),
ReadTimeout: sinkArgs.ELReadTimeOut * time.Second,
WriteTimeout: sinkArgs.ELWriteTimeOut * time.Second,
IdleTimeout: sinkArgs.ELIdleTimeOut * time.Second,
Handler: http.TimeoutHandler(mux,
sinkArgs.ELTimeOutHandler*time.Second, "EventListener Timeout!\n"),
}

if sinkArgs.Cert == "" && sinkArgs.Key == "" {
if err := srv.ListenAndServe(); err != nil {
logger.Fatalf("failed to start eventlistener sink: %v", err)
}
} else {
if err := srv.ListenAndServeTLS(sinkArgs.Cert, sinkArgs.Key); err != nil {
logger.Fatalf("failed to start eventlistener sink: %v", err)
}
}
err = profilingServer.Shutdown(context.Background())
if err != nil {
logger.Fatalf("failed to shutdown profiling server: %v", err)
}
evadapter.MainWithContext(ctx, EventListenerLogKey, adapter.NewEnvConfig, adapter.New(sinkArgs, sinkClients, recorder))
}
6 changes: 4 additions & 2 deletions docs/eventlisteners.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ Where for each returned line, the column values are, from left to right:

## Configuring logging for `EventListeners`

You can configure logging for your `EventListener` using the `config-logging-triggers` `ConfigMap` located in the `EventListener's` namespace.
Tekton Triggers automatically creates and populates this `ConfigMap` with default values described in [config-logging.yaml](../config/config-logging.yaml).
You can configure logging for your `EventListener`s using the `config-logging-triggers`
`ConfigMap` located in the `tekton-pipelines` namespace ([config-logging.yaml](../config/config-logging.yaml)).
Tekton Triggers automatically reconciles this configmap into environment variables on your
event listener deployment.

To access your `EventListener` logs, query your cluster for Pods whose `eventlistener` label matches the name of your `EventListener` object. For example:

Expand Down
29 changes: 13 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,36 @@ go 1.15

require (
github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher v0.0.0-20191203181535-308b93ad1f39
github.com/cloudevents/sdk-go/v2 v2.4.1
github.com/golang/protobuf v1.5.2
github.com/google/cel-go v0.7.3
github.com/google/go-cmp v0.5.6
github.com/google/go-containerregistry v0.5.2-0.20210709161016-b448abac9a70 // indirect
github.com/google/go-github/v31 v31.0.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.2.1
github.com/tektoncd/pipeline v0.27.1
github.com/tektoncd/pipeline v0.27.1-0.20210830154614-c8c729131d4a
github.com/tektoncd/plumbing v0.0.0-20210514044347-f8a9689d5bd5
github.com/tidwall/gjson v1.6.5 // indirect
github.com/tidwall/sjson v1.0.4
go.opencensus.io v0.23.0
go.uber.org/zap v1.18.1
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
go.uber.org/zap v1.19.0
golang.org/x/mod v0.5.0 // indirect
golang.org/x/net v0.0.0-20210825183410-e898025ed96a // indirect
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.50.0 // indirect
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84
google.golang.org/grpc v1.39.0
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1
k8s.io/api v0.20.7
k8s.io/apiextensions-apiserver v0.20.7
k8s.io/apimachinery v0.20.7
k8s.io/client-go v0.20.7
k8s.io/code-generator v0.20.7
k8s.io/kube-openapi v0.0.0-20210113233702-8566a335510f
knative.dev/pkg v0.0.0-20210730172132-bb4aaf09c430
knative.dev/serving v0.24.0
k8s.io/api v0.21.4
k8s.io/apiextensions-apiserver v0.21.4
k8s.io/apimachinery v0.21.4
k8s.io/client-go v0.21.4
k8s.io/code-generator v0.21.4
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7
knative.dev/eventing v0.25.0
knative.dev/pkg v0.0.0-20210827184538-2bd91f75571c
knative.dev/serving v0.25.0
sigs.k8s.io/yaml v1.2.0
)

Expand Down
Loading