diff --git a/pkg/cvo/metrics.go b/pkg/cvo/metrics.go index db0332869..3c2be6976 100644 --- a/pkg/cvo/metrics.go +++ b/pkg/cvo/metrics.go @@ -1,14 +1,19 @@ package cvo import ( + "context" + "net" + "net/http" "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" + "k8s.io/klog" configv1 "github.com/openshift/api/config/v1" "github.com/openshift/cluster-version-operator/lib/resourcemerge" @@ -86,6 +91,67 @@ version for 'cluster', or empty for 'initial'. } } +// RunMetrics launches an server bound to listenAddress serving +// Prometheus metrics at /metrics over HTTP. Continues serving until +// runContext.Done() and then attempts a clean shutdown limited by +// shutdownContext.Done(). Assumes runContext.Done() occurs before or +// simultaneously with shutdownContext.Done(). +func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string) error { + handler := http.NewServeMux() + handler.Handle("/metrics", promhttp.Handler()) + server := &http.Server{ + Handler: handler, + } + + errorChannel := make(chan error, 1) + errorChannelCount := 1 + go func() { + tcpListener, err := net.Listen("tcp", listenAddress) + if err != nil { + errorChannel <- err + return + } + + klog.Infof("Metrics port listening for HTTP on %v", listenAddress) + + errorChannel <- server.Serve(tcpListener) + }() + + shutdown := false + var loopError error + for errorChannelCount > 0 { + if shutdown { + err := <-errorChannel + errorChannelCount-- + if err != nil && err != http.ErrServerClosed { + if loopError == nil { + loopError = err + } else if err != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", err) + } + } + } else { + select { + case <-runContext.Done(): // clean shutdown + case err := <-errorChannel: // crashed before a shutdown was requested + errorChannelCount-- + if err != nil && err != http.ErrServerClosed { + loopError = err + } + } + shutdown = true + shutdownError := server.Shutdown(shutdownContext) + if loopError == nil { + loopError = shutdownError + } else if shutdownError != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError) + } + } + } + + return loopError +} + type conditionKey struct { Name string Type string diff --git a/pkg/start/start.go b/pkg/start/start.go index f79480640..76fb4b508 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "math/rand" - "net/http" "os" "os/signal" "sync" @@ -14,7 +13,6 @@ import ( "time" "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -152,14 +150,14 @@ func (o *Options) Run() error { } func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { - // listen on metrics + runContext, runCancel := context.WithCancel(ctx) + shutdownContext, shutdownCancel := context.WithCancel(ctx) + errorChannel := make(chan error, 1) + errorChannelCount := 0 if o.ListenAddr != "" { - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) + errorChannelCount++ go func() { - if err := http.ListenAndServe(o.ListenAddr, mux); err != nil { - klog.Fatalf("Unable to start metrics server: %v", err) - } + errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr) }() } @@ -176,9 +174,9 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc RetryPeriod: retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(localCtx context.Context) { - controllerCtx.Start(ctx) + controllerCtx.Start(runContext) select { - case <-ctx.Done(): + case <-runContext.Done(): // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel // and client-go ContextCancelable, which allows us to block new API requests before // we step down. However, the CVO isn't that sensitive to races and can tolerate @@ -207,6 +205,34 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc }, }) + for errorChannelCount > 0 { + var shutdownTimer *time.Timer + if shutdownTimer == nil { // running + select { + case <-runContext.Done(): + shutdownTimer = time.NewTimer(2 * time.Minute) + case err := <-errorChannel: + errorChannelCount-- + if err != nil { + klog.Error(err) + runCancel() // this will cause shutdownTimer initialization in the next loop + } + } + } else { // shutting down + select { + case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing. + shutdownCancel() + shutdownTimer.Stop() + case err := <-errorChannel: + errorChannelCount-- + if err != nil { + klog.Error(err) + runCancel() + } + } + } + } + <-exit }