Skip to content

Commit

Permalink
pkg/cvo/metrics: Graceful server shutdown
Browse files Browse the repository at this point in the history
Somewhat like the example in [1].  This pushes the server management
down into a new RunMetrics method, which we then run in its own
goroutine.  This is initial groundwork; I expect we will port more of
our child goroutines to this framework in follow-up work.

Cherry-picked from b30aa0e (openshift#349), around conflicts due to the lack
of TLS metrics in 4.5.

[1]: https://golang.org/pkg/net/http/#Server.Shutdown
  • Loading branch information
wking committed Aug 26, 2020
1 parent 55ff603 commit d257c32
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 10 deletions.
66 changes: 66 additions & 0 deletions pkg/cvo/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package cvo

import (
"context"
"net"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
Expand Down Expand Up @@ -86,6 +91,67 @@ version for 'cluster', or empty for 'initial'.
}
}

// RunMetrics launches an server bound to listenAddress serving
// Prometheus metrics at /metrics over HTTP. Continues serving until
// runContext.Done() and then attempts a clean shutdown limited by
// shutdownContext.Done(). Assumes runContext.Done() occurs before or
// simultaneously with shutdownContext.Done().
func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string) error {
handler := http.NewServeMux()
handler.Handle("/metrics", promhttp.Handler())
server := &http.Server{
Handler: handler,
}

errorChannel := make(chan error, 1)
errorChannelCount := 1
go func() {
tcpListener, err := net.Listen("tcp", listenAddress)
if err != nil {
errorChannel <- err
return
}

klog.Infof("Metrics port listening for HTTP on %v", listenAddress)

errorChannel <- server.Serve(tcpListener)
}()

shutdown := false
var loopError error
for errorChannelCount > 0 {
if shutdown {
err := <-errorChannel
errorChannelCount--
if err != nil && err != http.ErrServerClosed {
if loopError == nil {
loopError = err
} else if err != nil { // log the error we are discarding
klog.Errorf("Failed to gracefully shut down metrics server: %s", err)
}
}
} else {
select {
case <-runContext.Done(): // clean shutdown
case err := <-errorChannel: // crashed before a shutdown was requested
errorChannelCount--
if err != nil && err != http.ErrServerClosed {
loopError = err
}
}
shutdown = true
shutdownError := server.Shutdown(shutdownContext)
if loopError == nil {
loopError = shutdownError
} else if shutdownError != nil { // log the error we are discarding
klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError)
}
}
}

return loopError
}

type conditionKey struct {
Name string
Type string
Expand Down
46 changes: 36 additions & 10 deletions pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -152,14 +150,14 @@ func (o *Options) Run() error {
}

func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) {
// listen on metrics
runContext, runCancel := context.WithCancel(ctx)
shutdownContext, shutdownCancel := context.WithCancel(ctx)
errorChannel := make(chan error, 1)
errorChannelCount := 0
if o.ListenAddr != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
errorChannelCount++
go func() {
if err := http.ListenAndServe(o.ListenAddr, mux); err != nil {
klog.Fatalf("Unable to start metrics server: %v", err)
}
errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr)
}()
}

Expand All @@ -176,9 +174,9 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(localCtx context.Context) {
controllerCtx.Start(ctx)
controllerCtx.Start(runContext)
select {
case <-ctx.Done():
case <-runContext.Done():
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
// and client-go ContextCancelable, which allows us to block new API requests before
// we step down. However, the CVO isn't that sensitive to races and can tolerate
Expand Down Expand Up @@ -207,6 +205,34 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
},
})

for errorChannelCount > 0 {
var shutdownTimer *time.Timer
if shutdownTimer == nil { // running
select {
case <-runContext.Done():
shutdownTimer = time.NewTimer(2 * time.Minute)
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
runCancel() // this will cause shutdownTimer initialization in the next loop
}
}
} else { // shutting down
select {
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
shutdownCancel()
shutdownTimer.Stop()
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
runCancel()
}
}
}
}

<-exit
}

Expand Down

0 comments on commit d257c32

Please sign in to comment.