Skip to content

Commit

Permalink
.*: Introduce graceful shutdown for gRPC Servers (#1687)
Browse files Browse the repository at this point in the history
* Inroduce graceful shutdown for gRPC

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add missed cancel branch

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove stutter from server structs

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Close servers immediately if grace period is not specified

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update CHANGELOG

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Rename TLS methods, clarify log messages

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Document public functions

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update bucket docs

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* trigger checks

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* trigger checks

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and bwplotka committed Nov 1, 2019
1 parent c7e787d commit 9d4d0bf
Show file tree
Hide file tree
Showing 27 changed files with 566 additions and 408 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#1687](https://github.com/thanos-io/thanos/pull/1687) Add a new `--grpc-grace-period` CLI option to components which serve gRPC to set how long to wait until gRPC Server shuts down.
- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information.
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.
Expand Down
29 changes: 14 additions & 15 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import (
"text/template"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/olekukonko/tablewriter"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand All @@ -20,19 +29,9 @@ import (
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"github.com/thanos-io/thanos/pkg/verifier"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/olekukonko/tablewriter"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -311,7 +310,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) {
cmd := root.Command("web", "Web interface for remote storage bucket")
bind := cmd.Flag("listen", "HTTP host:port to listen on").Default("0.0.0.0:8080").String()
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
_, httpGracePeriod := regHTTPFlags(cmd)
interval := cmd.Flag("refresh", "Refresh interval to download metadata from remote storage").Default("30m").Duration()
timeout := cmd.Flag("timeout", "Timeout to download metadata from remote storage").Default("5m").Duration()
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()
Expand All @@ -321,9 +320,9 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str

statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, component.Bucket, statusProber,
server.WithListen(*bind),
server.WithGracePeriod(time.Duration(*httpGracePeriod)),
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
httpserver.WithListen(*bind),
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)

bucketUI := ui.NewBucketUI(logger, *label)
Expand Down
14 changes: 6 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (
"strings"
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand All @@ -25,10 +22,12 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -79,8 +78,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
"Compaction index verification will ignore out of order label names.").
Hidden().Default("false").Bool()

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process compactions.").
Default("./data").String()
Expand Down Expand Up @@ -179,9 +177,9 @@ func runCompact(

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, component, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
srv := httpserver.New(logger, reg, component, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(srv.ListenAndServe, srv.Shutdown)
Expand Down
14 changes: 6 additions & 8 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"path/filepath"
"time"

"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/server"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand All @@ -23,19 +20,20 @@ import (
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

func registerDownsample(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Downsample
cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket")

httpAddr := regHTTPAddrFlag(cmd)
httpGracePeriod := regHTTPGracePeriodFlag(cmd)
httpAddr, httpGracePeriod := regHTTPFlags(cmd)

dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Default("./data").String()
Expand Down Expand Up @@ -126,9 +124,9 @@ func runDownsample(
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := server.NewHTTP(logger, reg, comp, statusProber,
server.WithListen(httpBindAddr),
server.WithGracePeriod(httpGracePeriod),
srv := httpserver.New(logger, reg, comp, statusProber,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
g.Add(srv.ListenAndServe, srv.Shutdown)

Expand Down
12 changes: 7 additions & 5 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,31 @@ func modelDuration(flags *kingpin.FlagClause) *model.Duration {

func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
grpcGracePeriod *model.Duration,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components.").
Default("0.0.0.0:10901").String()
grpcGracePeriod = modelDuration(cmd.Flag("grpc-grace-period", "Time to wait after an interrupt received for GRPC Server.").Default("2m")) // by default it's the same as query.timeout.

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvKey = cmd.Flag("grpc-server-tls-key", "TLS Key for the gRPC server, leave blank to disable TLS").Default("").String()
grpcTLSSrvClientCA = cmd.Flag("grpc-server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").String()

return grpcBindAddr,
grpcGracePeriod,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA
}

func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
}
func regHTTPFlags(cmd *kingpin.CmdClause) (httpBindAddr *string, httpGracePeriod *model.Duration) {
httpBindAddr = cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
httpGracePeriod = modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("2m")) // by default it's the same as query.timeout.

func regHTTPGracePeriodFlag(cmd *kingpin.CmdClause) *model.Duration {
return modelDuration(cmd.Flag("http-grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("5s"))
return httpBindAddr, httpGracePeriod
}

func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool, extraDesc ...string) *extflag.PathOrContent {
Expand Down
156 changes: 0 additions & 156 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,25 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"io/ioutil"
"math"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"syscall"

gmetrics "github.com/armon/go-metrics"
gprom "github.com/armon/go-metrics/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
"go.uber.org/automaxprocs/maxprocs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -222,145 +208,3 @@ func interrupt(logger log.Logger, cancel <-chan struct{}) error {
return errors.New("canceled")
}
}

func defaultGRPCTLSServerOpts(logger log.Logger, cert, key, clientCA string) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
tlsCfg, err := defaultTLSServerOpts(log.With(logger, "protocol", "gRPC"), cert, key, clientCA)
if err != nil {
return opts, err
}
if tlsCfg != nil {
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
}
return opts, nil
}

func defaultTLSServerOpts(logger log.Logger, cert, key, clientCA string) (*tls.Config, error) {
if key == "" && cert == "" {
if clientCA != "" {
return nil, errors.New("when a client CA is used a server key and certificate must also be provided")
}

level.Info(logger).Log("msg", "disabled TLS, key and cert must be set to enable")
return nil, nil
}

level.Info(logger).Log("msg", "enabling server side TLS")

if key == "" || cert == "" {
return nil, errors.New("both server key and certificate must be provided")
}

tlsCfg := &tls.Config{
MinVersion: tls.VersionTLS12,
}

tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "server credentials")
}

tlsCfg.Certificates = []tls.Certificate{tlsCert}

if clientCA != "" {
caPEM, err := ioutil.ReadFile(clientCA)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
tlsCfg.ClientCAs = certPool
tlsCfg.ClientAuth = tls.RequireAndVerifyClientCert

level.Info(logger).Log("msg", "server TLS client verification enabled")
}

return tlsCfg, nil
}

func defaultTLSClientOpts(logger log.Logger, cert, key, caCert, serverName string) (*tls.Config, error) {
var certPool *x509.CertPool
if caCert != "" {
caPEM, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, errors.Wrap(err, "reading client CA")
}

certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caPEM) {
return nil, errors.Wrap(err, "building client CA")
}
level.Info(logger).Log("msg", "TLS client using provided certificate pool")
} else {
var err error
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, errors.Wrap(err, "reading system certificate pool")
}
level.Info(logger).Log("msg", "TLS client using system certificate pool")
}

tlsCfg := &tls.Config{
RootCAs: certPool,
}

if serverName != "" {
tlsCfg.ServerName = serverName
}

if (key != "") != (cert != "") {
return nil, errors.New("both client key and certificate must be provided")
}

if cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrap(err, "client credentials")
}
tlsCfg.Certificates = []tls.Certificate{cert}
level.Info(logger).Log("msg", "TLS client authentication enabled")
}
return tlsCfg, nil
}

func newStoreGRPCServer(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, srv storepb.StoreServer, opts []grpc.ServerOption) *grpc.Server {
met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
}),
)
panicsTotal := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_req_panics_recovered_total",
Help: "Total number of gRPC requests recovered from internal panic.",
})
reg.MustRegister(met, panicsTotal)

grpcPanicRecoveryHandler := func(p interface{}) (err error) {
panicsTotal.Inc()
level.Error(logger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
opts = append(opts,
grpc.MaxSendMsgSize(math.MaxInt32),
grpc_middleware.WithUnaryServerChain(
met.UnaryServerInterceptor(),
tracing.UnaryServerInterceptor(tracer),
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc_middleware.WithStreamServerChain(
met.StreamServerInterceptor(),
tracing.StreamServerInterceptor(tracer),
grpc_recovery.StreamServerInterceptor(grpc_recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
)

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, srv)
met.InitializeMetrics(s)

return s
}
Loading

0 comments on commit 9d4d0bf

Please sign in to comment.