diff --git a/CHANGELOG.md b/CHANGELOG.md index 7423c8df3f0..ff967ba0ce9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Ingester: reduced the memory footprint of active series custom trackers. #2568 * [ENHANCEMENT] Distributor: Include `X-Scope-OrgId` header in requests forwarded to configured forwarding endpoint. #3283 * [ENHANCEMENT] Alertmanager: reduced memory utilization in Mimir clusters with a large number of tenants. #3309 +* [ENHANCEMENT] Add experimental flag `-shutdown-delay` to allow components to wait after receiving SIGTERM and before stopping. In this time the component returns 503 from /ready endpoint. #3298 * [BUGFIX] Flusher: Add `Overrides` as a dependency to prevent panics when starting with `-target=flusher`. #3151 * [BUGFIX] Updated `golang.org/x/text` dependency to fix CVE-2022-32149. #3285 * [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index ba5647c575f..c4050d0e119 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -35,6 +35,17 @@ "fieldType": "string", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "shutdown_delay", + "required": false, + "desc": "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "shutdown-delay", + "fieldType": "duration", + "fieldCategory": "experimental" + }, { "kind": "block", "name": "api", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 240579c9032..a001a486596 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1871,6 +1871,8 @@ Usage of ./cmd/mimir/mimir: Comma-separated list of cipher suites to use. If blank, the default Go cipher suites is used. -server.tls-min-version string Minimum TLS version to use. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13. If blank, the Go TLS minimum version is used. + -shutdown-delay duration + [experimental] How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint. -store-gateway.sharding-ring.consul.acl-token string ACL Token used to interact with Consul. -store-gateway.sharding-ring.consul.cas-retry-delay duration diff --git a/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md b/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md index 737789dc5ac..da71f0827ec 100644 --- a/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md +++ b/docs/sources/operators-guide/configure/reference-configuration-parameters/index.md @@ -112,6 +112,11 @@ where `default_value` is the value to use if the environment variable is undefin # CLI flag: -auth.no-auth-tenant [no_auth_tenant: | default = "anonymous"] +# (experimental) How long to wait between SIGTERM and shutdown. After receiving +# SIGTERM, Mimir will report not-ready status via /ready endpoint. +# CLI flag: -shutdown-delay +[shutdown_delay: | default = 0s] + api: # (advanced) Allows to skip label name validation via # X-Mimir-SkipLabelNameValidation header on the http write path. Use with diff --git a/go.mod b/go.mod index fa4458bdaab..4732eb481d4 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20221018134951-0d3fc3d6c266 + github.com/grafana/dskit v0.0.0-20221026142359-210cad87c563 github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 3bfce163de5..858c43d1070 100644 --- a/go.sum +++ b/go.sum @@ -480,8 +480,8 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU/OjCJaLlPQI7NmQCRlfjMPSA1VegvA= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20221018134951-0d3fc3d6c266 h1:NHL4FGXvIBZ23BVYCeWOHj0TONbgqIwQ/aawRBam0J0= -github.com/grafana/dskit v0.0.0-20221018134951-0d3fc3d6c266/go.mod h1:NTfOwhBMmR7TyG4E3RB4F1qhvk+cawoXacyN30yipVY= +github.com/grafana/dskit v0.0.0-20221026142359-210cad87c563 h1:CRkbcjG9nFSc42p1uNvAnkCCsiLb4reNL2r93tn+zzI= +github.com/grafana/dskit v0.0.0-20221026142359-210cad87c563/go.mod h1:NTfOwhBMmR7TyG4E3RB4F1qhvk+cawoXacyN30yipVY= github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b h1:Ha+kSIoTutf4ytlVw/SaEclDUloYx0+FXDKJWKhNbE4= github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b/go.mod h1:3UsooRp7yW5/NJQBlXcTsAHOoykEhNUYXkQ3r6ehEEY= github.com/grafana/gomemcache v0.0.0-20220812141943-44b6cde200bb h1:CqfZjjd8iK3G1TV8Wf0u7WTY+0RxIEbmcgxftt9qVtw= diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index dafe56f301d..ee0959a5a75 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -14,6 +14,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -33,6 +34,7 @@ import ( "github.com/weaveworks/common/server" "github.com/weaveworks/common/signals" "go.opentelemetry.io/otel" + "go.uber.org/atomic" "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v3" @@ -95,6 +97,7 @@ type Config struct { Target flagext.StringSliceCSV `yaml:"target"` MultitenancyEnabled bool `yaml:"multitenancy_enabled"` NoAuthTenant string `yaml:"no_auth_tenant" category:"advanced"` + ShutdownDelay time.Duration `yaml:"shutdown_delay" category:"experimental"` PrintConfig bool `yaml:"-"` ApplicationName string `yaml:"-"` @@ -143,6 +146,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.BoolVar(&c.MultitenancyEnabled, "auth.multitenancy-enabled", true, "When set to true, incoming HTTP requests must specify tenant ID in HTTP X-Scope-OrgId header. When set to false, tenant ID from -auth.no-auth-tenant is used instead.") f.StringVar(&c.NoAuthTenant, "auth.no-auth-tenant", "anonymous", "Tenant ID to use when multitenancy is disabled.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") + f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.") c.API.RegisterFlags(f) c.registerServerFlagsWithChangedDefaultValues(f) @@ -756,10 +760,15 @@ func (t *Mimir) Run() error { return err } + // Used to delay shutdown but return "not ready" during this delay. + shutdownRequested := atomic.NewBool(false) + // before starting servers, register /ready handler and gRPC health check service. - // It should reflect entire Mimir. - t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) - grpc_health_v1.RegisterHealthServer(t.Server.GRPC, grpcutil.NewHealthCheck(sm)) + t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm, shutdownRequested)) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC, grpcutil.NewHealthCheckFrom( + grpcutil.WithShutdownRequested(shutdownRequested), + grpcutil.WithManager(sm), + )) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(util_log.Logger).Log("msg", "Application started") } @@ -785,10 +794,18 @@ func (t *Mimir) Run() error { sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed)) - // Setup signal handler. If signal arrives, we stop the manager, which stops all the services. + // Setup signal handler to gracefully shutdown in response to SIGTERM or SIGINT handler := signals.NewHandler(t.Server.Log) go func() { handler.Loop() + + shutdownRequested.Store(true) + t.Server.HTTPServer.SetKeepAlivesEnabled(false) + + if t.Cfg.ShutdownDelay > 0 { + time.Sleep(t.Cfg.ShutdownDelay) + } + sm.StopAsync() }() @@ -818,8 +835,14 @@ func (t *Mimir) Run() error { return err } -func (t *Mimir) readyHandler(sm *services.Manager) http.HandlerFunc { +func (t *Mimir) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if shutdownRequested.Load() { + level.Debug(util_log.Logger).Log("msg", "application is stopping") + http.Error(w, "Application is stopping", http.StatusServiceUnavailable) + return + } + if !sm.IsHealthy() { var serviceNamesStates []string for name, s := range t.ServiceMap { @@ -829,7 +852,6 @@ func (t *Mimir) readyHandler(sm *services.Manager) http.HandlerFunc { } level.Debug(util_log.Logger).Log("msg", "some services are not Running", "services", serviceNamesStates) - httpResponse := "Some services are not Running:\n" + strings.Join(serviceNamesStates, "\n") http.Error(w, httpResponse, http.StatusServiceUnavailable) return diff --git a/vendor/github.com/grafana/dskit/grpcutil/health_check.go b/vendor/github.com/grafana/dskit/grpcutil/health_check.go index 2b567b36804..44b5e15e765 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/health_check.go +++ b/vendor/github.com/grafana/dskit/grpcutil/health_check.go @@ -4,29 +4,64 @@ import ( "context" "github.com/gogo/status" + "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/dskit/services" ) +// Check is a function that determines if this gRPC application is healthy. +type Check func(ctx context.Context) bool + +// WithManager returns a new Check that tests if the managed services are healthy. +func WithManager(manager *services.Manager) Check { + return func(ctx context.Context) bool { + states := manager.ServicesByState() + + // Given this is a health check endpoint for the whole instance, we should consider + // it healthy after all services have been started (running) and until all + // services are terminated. Some services, like ingesters, are still + // fully functioning while stopping. + if len(states[services.New]) > 0 || len(states[services.Starting]) > 0 || len(states[services.Failed]) > 0 { + return false + } + + return len(states[services.Running]) > 0 || len(states[services.Stopping]) > 0 + } +} + +// WithShutdownRequested returns a new Check that returns false when shutting down. +func WithShutdownRequested(requested *atomic.Bool) Check { + return func(ctx context.Context) bool { + return !requested.Load() + } +} + // HealthCheck fulfills the grpc_health_v1.HealthServer interface by ensuring -// the services being managed by the provided service manager are healthy. +// each of the provided Checks indicates the application is healthy. type HealthCheck struct { - sm *services.Manager + checks []Check } // NewHealthCheck returns a new HealthCheck for the provided service manager. func NewHealthCheck(sm *services.Manager) *HealthCheck { + return NewHealthCheckFrom(WithManager(sm)) +} + +// NewHealthCheckFrom returns a new HealthCheck that uses each of the provided Checks. +func NewHealthCheckFrom(checks ...Check) *HealthCheck { return &HealthCheck{ - sm: sm, + checks: checks, } } // Check implements the grpc healthcheck. -func (h *HealthCheck) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { - if !h.isHealthy() { - return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil +func (h *HealthCheck) Check(ctx context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + for _, check := range h.checks { + if !check(ctx) { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil + } } return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil @@ -36,18 +71,3 @@ func (h *HealthCheck) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequ func (h *HealthCheck) Watch(_ *grpc_health_v1.HealthCheckRequest, _ grpc_health_v1.Health_WatchServer) error { return status.Error(codes.Unimplemented, "Watching is not supported") } - -// isHealthy returns whether the instance should be considered healthy. -func (h *HealthCheck) isHealthy() bool { - states := h.sm.ServicesByState() - - // Given this is an health check endpoint for the whole instance, we should consider - // it healthy after all services have been started (running) and until all - // services are terminated. Some services, like ingesters, are still - // fully functioning while stopping. - if len(states[services.New]) > 0 || len(states[services.Starting]) > 0 || len(states[services.Failed]) > 0 { - return false - } - - return len(states[services.Running]) > 0 || len(states[services.Stopping]) > 0 -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 65142f18774..6b2087a862e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -463,7 +463,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20221018134951-0d3fc3d6c266 +# github.com/grafana/dskit v0.0.0-20221026142359-210cad87c563 ## explicit; go 1.18 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency