From 3284e0dc4a7ab844b0cff23b02c5fbc7cba0b6e1 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 7 Nov 2024 14:46:34 +0100 Subject: [PATCH] query, rule: make endpoint discovery dynamically reloadable * Removed previously deprecated and hidden flags to configure endpoints ( --rule, --target, ...) * Added new flags --endpoint.sd-config, --endpoint-sd-config-reload-interval to configure a dynamic SD file * Moved endpoint set construction into cmd/thanos/endpointset.go for a little cleanup The new config makes it possible to also set "strict" and "group" flags on the endpoint instead of only their addresses, making it possible to have file based service discovery for endpoint groups too. Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + cmd/thanos/config.go | 38 +++ cmd/thanos/endpointset.go | 374 ++++++++++++++++++++++++++ cmd/thanos/query.go | 463 +++++---------------------------- cmd/thanos/rule.go | 46 +--- docs/components/query.md | 71 +++-- pkg/discovery/dns/grpc.go | 2 +- pkg/extkingpin/flags.go | 10 +- pkg/query/endpointset.go | 28 +- pkg/query/endpointset_test.go | 30 +-- test/e2e/e2ethanos/services.go | 29 ++- test/e2e/exemplars_api_test.go | 2 +- test/e2e/metadata_api_test.go | 2 +- 13 files changed, 575 insertions(+), 521 deletions(-) create mode 100644 cmd/thanos/endpointset.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 36d6e89aa7..4e4a6fd960 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7704](https://github.com/thanos-io/thanos/pull/7704) *: *breaking :warning:* remove Store gRPC Info function. This has been deprecated for 3 years, its time to remove it. - [#7741](https://github.com/thanos-io/thanos/pull/7741) Deps: Bump Objstore to `v0.0.0-20240913074259-63feed0da069` - [#7813](https://github.com/thanos-io/thanos/pull/7813) Receiver: enable initial TSDB compaction time randomization +- [#7890](https://github.com/thanos-io/thanos/pull/7890) Query,Ruler: *breaking :warning:* deprecated `--store.sd-file` and `--store.sd-interval` to be replaced with `--endpoint.sd-config` and `--endpoint-sd-config-reload-interval`; removed legacy flags to pass endpoints `--store`, `--metadata`, `--rule`, `--exemplar`. ### Removed diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index f72d19fd79..f16f885737 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -14,11 +14,17 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" extflag "github.com/efficientgo/tools/extkingpin" + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "google.golang.org/grpc" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/shipper" ) @@ -58,6 +64,38 @@ func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig { return gc } +type grpcClientConfig struct { + secure bool + skipVerify bool + cert, key, caCert string + serverName string + compression string +} + +func (gc *grpcClientConfig) registerFlag(cmd extkingpin.FlagClause) *grpcClientConfig { + cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").BoolVar(&gc.secure) + cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").BoolVar(&gc.skipVerify) + cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").StringVar(&gc.cert) + cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").StringVar(&gc.key) + cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").StringVar(&gc.caCert) + cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&gc.serverName) + compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") + cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).EnumVar(&gc.compression, snappy.Name, compressionNone) + + return gc +} + +func (gc *grpcClientConfig) dialOptions(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer) ([]grpc.DialOption, error) { + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, gc.secure, gc.skipVerify, gc.cert, gc.key, gc.caCert, gc.serverName) + if err != nil { + return nil, errors.Wrapf(err, "building gRPC client") + } + if gc.compression != compressionNone { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gc.compression))) + } + return dialOpts, nil +} + type httpConfig struct { bindAddress string tlsConfig string diff --git a/cmd/thanos/endpointset.go b/cmd/thanos/endpointset.go new file mode 100644 index 0000000000..04bb4deaa1 --- /dev/null +++ b/cmd/thanos/endpointset.go @@ -0,0 +1,374 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/run" + "google.golang.org/grpc" + "gopkg.in/yaml.v3" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" + + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/discovery/cache" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/errors" + "github.com/thanos-io/thanos/pkg/extgrpc" + "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/runutil" +) + +// fileContent is the interface of methods that we need from extkingpin.PathOrContent. +// We need to abstract it for now so we can implement a default if the user does not provide one. +type fileContent interface { + Content() ([]byte, error) + Path() string +} + +// emptyPathContent is a "fileContent" implementation to use when nothing was provided. +// we need this for some convenience while it is possible to not provide a `--endpoint.sd-config` +// flag and just use the static `--endpoint` flags. +// Once we decided to make this flag required and deprecate the "--endpoint" families of flags +// this can be discarded again. +type emptyPathContent struct { +} + +func (t *emptyPathContent) Content() ([]byte, error) { + return nil, nil +} + +func (t *emptyPathContent) Path() string { + return "" +} + +type EndpointSpec struct { + Strict bool `yaml:"strict"` + Group bool `yaml:"group"` + Address string `yaml:"address"` +} + +type EndpointConfig struct { + Endpoints []EndpointSpec `yaml:"endpoints"` +} + +type endpointConfigProvider struct { + mu sync.Mutex + cfg EndpointConfig + + // statically defined endpoints from flags for backwards compatibility + endpoints []string + endpointGroups []string + strictEndpoints []string + strictEndpointGroups []string +} + +func (er *endpointConfigProvider) config() EndpointConfig { + er.mu.Lock() + defer er.mu.Unlock() + + res := EndpointConfig{Endpoints: make([]EndpointSpec, len(er.cfg.Endpoints))} + copy(res.Endpoints, er.cfg.Endpoints) + return res +} + +func (er *endpointConfigProvider) parse(configFile fileContent) (EndpointConfig, error) { + content, err := configFile.Content() + if err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to load config content: %s", configFile.Path()) + } + var cfg EndpointConfig + if err := yaml.Unmarshal(content, &cfg); err != nil { + return EndpointConfig{}, errors.Wrapf(err, "unable to unmarshal config content: %s", configFile.Path()) + } + return cfg, nil +} + +func (er *endpointConfigProvider) addStaticEndpoints(cfg *EndpointConfig) { + for _, e := range er.endpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + }) + } + for _, e := range er.endpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + }) + } + for _, e := range er.strictEndpoints { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Strict: true, + }) + } + for _, e := range er.strictEndpointGroups { + cfg.Endpoints = append(cfg.Endpoints, EndpointSpec{ + Address: e, + Group: true, + Strict: true, + }) + } +} + +func validateEndpointConfig(cfg EndpointConfig) error { + for _, ecfg := range cfg.Endpoints { + if dns.IsDynamicNode(ecfg.Address) && ecfg.Strict { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode.", ecfg.Address) + } + if dns.IsDynamicNode(ecfg.Address) && ecfg.Group { + return errors.Newf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under group mode.", ecfg.Address) + } + } + return nil +} + +func newEndpointConfigProvider( + logger log.Logger, + configFile fileContent, + configReloadInterval time.Duration, + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, +) (*endpointConfigProvider, error) { + res := &endpointConfigProvider{ + endpoints: endpoints, + endpointGroups: endpointGroups, + strictEndpoints: strictEndpoints, + strictEndpointGroups: strictEndpointGroups, + } + + if configFile == nil { + configFile = &emptyPathContent{} + } + + cfg, err := res.parse(configFile) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config file") + } + res.addStaticEndpoints(&cfg) + res.cfg = cfg + + // only static endpoints + if len(configFile.Path()) == 0 { + return res, nil + } + + if err := extkingpin.PathContentReloader(context.Background(), configFile, logger, func() { + res.mu.Lock() + defer res.mu.Unlock() + + level.Info(logger).Log("msg", "reloading endpoint config") + cfg, err := res.parse(configFile) + if err != nil { + level.Error(logger).Log("msg", "failed to reload endpoint config", "err", err) + return + } + res.addStaticEndpoints(&cfg) + if err := validateEndpointConfig(cfg); err != nil { + level.Error(logger).Log("msg", "failed to validate endpoint config", "err", err) + return + } + res.cfg = cfg + }, configReloadInterval); err != nil { + return nil, errors.Wrapf(err, "unable to create config reloader") + } + return res, nil +} + +func setupEndpointset( + g *run.Group, + comp component.Component, + reg prometheus.Registerer, + logger log.Logger, + configFile fileContent, + configReloadInterval time.Duration, + // legacy SD File config + legacyFileSDFiles []string, + legacyFileSDInterval time.Duration, + // legacy Static config + endpoints []string, + endpointGroups []string, + strictEndpoints []string, + strictEndpointGroups []string, + dnsSDResolver string, + dnsSDInterval time.Duration, + unhealthyTimeout time.Duration, + endpointTimeout time.Duration, + dialOpts []grpc.DialOption, + queryConnMetricLabels ...string, +) (*query.EndpointSet, error) { + configProvider, err := newEndpointConfigProvider( + logger, + configFile, + configReloadInterval, + endpoints, + endpointGroups, + strictEndpoints, + strictEndpointGroups, + ) + if err != nil { + return nil, errors.Wrapf(err, "unable to load config initially") + } + // Register resolver for the "thanos:///" scheme for endpoint-groups + dns.RegisterGRPCResolver( + logger, + dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix(fmt.Sprintf("thanos_%s_endpoint_groups_", comp), reg), + dns.ResolverType(dnsSDResolver), + ), + dnsSDInterval, + ) + dnsEndpointProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix(fmt.Sprintf("thanos_%s_endpoints_", comp), reg), + dns.ResolverType(dnsSDResolver), + ) + duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: fmt.Sprintf("thanos_%s_duplicated_store_addresses_total", comp), + Help: "The number of times a duplicated store addresses is detected from the different configs", + }) + + removeDuplicateEndpointSpecs := func(specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { + set := make(map[string]*query.GRPCEndpointSpec) + for _, spec := range specs { + addr := spec.Addr() + if _, ok := set[addr]; ok { + level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) + duplicatedStores.Inc() + } + set[addr] = spec + } + deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) + for _, value := range set { + deduplicated = append(deduplicated, value) + } + return deduplicated + } + legacyFileSDCache := cache.New() + { + // Setup Legacy File SD + var fileSD *file.Discovery + if len(legacyFileSDFiles) > 0 { + conf := &file.SDConfig{ + Files: legacyFileSDFiles, + RefreshInterval: model.Duration(legacyFileSDInterval), + } + var err error + if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil { + return nil, fmt.Errorf("unable to create new legacy file sd config: %w", err) + } + } + + if fileSD != nil { + fileSDUpdates := make(chan []*targetgroup.Group) + + ctxLegacyFileSD, cancelLegacyFileSD := context.WithCancel(context.Background()) + g.Add(func() error { + fileSD.Run(ctxLegacyFileSD, fileSDUpdates) + return nil + + }, func(err error) { + cancelLegacyFileSD() + }) + ctxUpdateLegacyFileSDUpdate, cancelLegacyFileSDUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + for { + select { + case update := <-fileSDUpdates: + // Discoverers sometimes send nil updates so need to check for it to avoid panics. + if update == nil { + continue + } + legacyFileSDCache.Update(update) + case <-ctxUpdateLegacyFileSDUpdate.Done(): + return nil + } + } + }, func(err error) { + cancelLegacyFileSDUpdate() + }) + } + } + + { + // Setup DNS Resolver + ctxDNSUpdate, cancelDNSUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(dnsSDInterval, ctxDNSUpdate.Done(), func() error { + ctxUpdateIter, cancelUpdateIter := context.WithTimeout(ctxDNSUpdate, dnsSDInterval) + defer cancelUpdateIter() + + endpointConfig := configProvider.config() + + addresses := make([]string, 0, len(endpointConfig.Endpoints)) + for _, ecfg := range endpointConfig.Endpoints { + if addr := ecfg.Address; !ecfg.Group && !ecfg.Strict { + // originally only "--endpoint" addresses got resolved + addresses = append(addresses, addr) + } + } + addresses = append(addresses, legacyFileSDCache.Addresses()...) + if err := dnsEndpointProvider.Resolve(ctxUpdateIter, addresses, true); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for endpoints", "err", err) + } + return nil + }) + }, func(error) { + cancelDNSUpdate() + }) + } + + // Setup Endpointset to consume resolved addresses and static addresses from config + endpointset := query.NewEndpointSet(time.Now, logger, reg, func() []*query.GRPCEndpointSpec { + endpointConfig := configProvider.config() + + specs := make([]*query.GRPCEndpointSpec, 0) + for _, ecfg := range endpointConfig.Endpoints { + strict, group, addr := ecfg.Strict, ecfg.Group, ecfg.Address + if dns.IsDynamicNode(addr) { + continue + } + if group { + specs = append(specs, query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", addr), strict, append(dialOpts, extgrpc.EndpointGroupGRPCOpts()...)...)) + } else { + specs = append(specs, query.NewGRPCEndpointSpec(addr, strict, dialOpts...)) + } + } + for _, addr := range dnsEndpointProvider.Addresses() { + specs = append(specs, query.NewGRPCEndpointSpec(addr, false, dialOpts...)) + } + return removeDuplicateEndpointSpecs(specs) + }, unhealthyTimeout, endpointTimeout, queryConnMetricLabels...) + + ctxEndpointUpdate, cancelEndpointUpdate := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(endpointTimeout, ctxEndpointUpdate.Done(), func() error { + ctxIter, cancelIter := context.WithTimeout(ctxEndpointUpdate, endpointTimeout) + defer cancelIter() + + endpointset.Update(ctxIter) + return nil + }) + }, func(error) { + cancelEndpointUpdate() + }) + + return endpointset, nil +} diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 69ffb8ea32..817a5d45b3 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -4,7 +4,6 @@ package main import ( - "context" "fmt" "math" "net/http" @@ -12,7 +11,6 @@ import ( "time" extflag "github.com/efficientgo/tools/extkingpin" - "google.golang.org/grpc" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -21,11 +19,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/discovery" - "github.com/prometheus/prometheus/discovery/file" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/api" @@ -35,11 +29,8 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/discovery/cache" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/extgrpc" - "github.com/thanos-io/thanos/pkg/extgrpc/snappy" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -51,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" - "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" @@ -86,14 +76,8 @@ func registerQuery(app *extkingpin.App) { var grpcServerConfig grpcConfig grpcServerConfig.registerFlag(cmd) - secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() - skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() - cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() - key := cmd.Flag("grpc-client-tls-key", "TLS Key for the client's certificate").Default("").String() - caCert := cmd.Flag("grpc-client-tls-ca", "TLS CA Certificates to use to verify gRPC servers").Default("").String() - serverName := cmd.Flag("grpc-client-server-name", "Server name to verify the hostname on the returned gRPC certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").String() - compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") - grpcCompression := cmd.Flag("grpc-compression", "Compression algorithm to use for gRPC requests to other clients. Must be one of: "+compressionOptions).Default(compressionNone).Enum(snappy.Name, compressionNone) + var grpcClientConfig grpcClientConfig + grpcClientConfig.registerFlag(cmd) webRoutePrefix := cmd.Flag("web.route-prefix", "Prefix for API and UI endpoints. This allows thanos UI to be served on a sub-path. Defaults to the value of --web.external-prefix. This option is analogous to --web.route-prefix of Prometheus.").Default("").String() webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the UI query web interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() @@ -134,55 +118,6 @@ func registerQuery(app *extkingpin.App) { selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated)."). PlaceHolder("=\"\"").Strings() - endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups."). - PlaceHolder("")) - - endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "Experimental: DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components."). - PlaceHolder("")) - - stores := extkingpin.Addrs(cmd.Flag("store", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups."). - PlaceHolder("")) - - // TODO(bwplotka): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - ruleEndpoints := extkingpin.Addrs(cmd.Flag("rule", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - metadataEndpoints := extkingpin.Addrs(cmd.Flag("metadata", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - exemplarEndpoints := extkingpin.Addrs(cmd.Flag("exemplar", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured exemplars API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect exemplars API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - // TODO(atunik): Hidden because we plan to extract discovery to separate API: https://github.com/thanos-io/thanos/issues/2600. - targetEndpoints := extkingpin.Addrs(cmd.Flag("target", "Deprecation Warning - This flag is deprecated and replaced with `endpoint`. Experimental: Addresses of statically configured target API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect target API servers through respective DNS lookups."). - Hidden().PlaceHolder("")) - - strictStores := cmd.Flag("store-strict", "Deprecation Warning - This flag is deprecated and replaced with `endpoint-strict`. Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpoints := cmd.Flag("endpoint-strict", "Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). - PlaceHolder("").Strings() - - strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "Experimental: DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails."). - PlaceHolder("")) - - fileSDFiles := cmd.Flag("store.sd-files", "Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). - PlaceHolder("").Strings() - - fileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "Refresh interval to re-read file SD files. It is used as a resync fallback."). - Default("5m")) - - // TODO(bwplotka): Grab this from TTL at some point. - dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). - Default("30s")) - - dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). - Default(string(dns.MiekgdnsResolverType)).Hidden().String() - - unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) - - endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) - enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -233,6 +168,35 @@ func registerQuery(app *extkingpin.App) { tenantCertField := cmd.Flag("query.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+tenancy.CertificateFieldOrganization+", "+tenancy.CertificateFieldOrganizationalUnit+" or "+tenancy.CertificateFieldCommonName+". This setting will cause the query.tenant-header flag value to be ignored.").Default("").Enum("", tenancy.CertificateFieldOrganization, tenancy.CertificateFieldOrganizationalUnit, tenancy.CertificateFieldCommonName) enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() + // TODO(bwplotka): Grab this from TTL at some point. + dnsSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-dns-interval", "Interval between DNS resolutions."). + Default("30s")) + + dnsSDResolver := cmd.Flag("store.sd-dns-resolver", fmt.Sprintf("Resolver to use. Possible options: [%s, %s]", dns.GolangResolverType, dns.MiekgdnsResolverType)). + Default(string(dns.MiekgdnsResolverType)).Hidden().String() + + unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) + + endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + + endpointSetConfig := extflag.RegisterPathOrContent(cmd, "endpoint.sd-config", "Config File with endpoint definitions") + + endpointSetConfigReloadInterval := extkingpin.ModelDuration(cmd.Flag("endpoint.sd-config-reload-interval", "Interval between endpoint config refreshes").Default("5m")) + + legacyFileSDFiles := cmd.Flag("store.sd-files", "(Deprecated) Path to files that contain addresses of store API servers. The path can be a glob pattern (repeatable)."). + PlaceHolder("").Strings() + + legacyFileSDInterval := extkingpin.ModelDuration(cmd.Flag("store.sd-interval", "(Deprecated) Refresh interval to re-read file SD files. It is used as a resync fallback."). + Default("5m")) + + endpoints := extkingpin.Addrs(cmd.Flag("endpoint", "(Deprecated): Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Thanos API servers through respective DNS lookups.").PlaceHolder("")) + + endpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable). Targets resolved from the DNS name will be queried in a round-robin, instead of a fanout manner. This flag should be used when connecting a Thanos Query to HA groups of Thanos components.").PlaceHolder("")) + + strictEndpoints := extkingpin.Addrs(cmd.Flag("endpoint-strict", "(Deprecated): Addresses of only statically configured Thanos API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). + PlaceHolder("")) + + strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").PlaceHolder("")) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -266,18 +230,6 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } - var fileSD *file.Discovery - if len(*fileSDFiles) > 0 { - conf := &file.SDConfig{ - Files: *fileSDFiles, - RefreshInterval: *fileSDInterval, - } - var err error - if fileSD, err = file.NewDiscovery(conf, logger, conf.NewDiscovererMetrics(reg, discovery.NewRefreshMetrics(reg))); err != nil { - return err - } - } - if *webRoutePrefix == "" { *webRoutePrefix = *webExternalPrefix } @@ -295,23 +247,46 @@ func registerQuery(app *extkingpin.App) { return err } + dialOpts, err := grpcClientConfig.dialOptions(logger, reg, tracer) + if err != nil { + return err + } + + endpointSet, err := setupEndpointset( + g, + comp, + reg, + logger, + endpointSetConfig, + time.Duration(*endpointSetConfigReloadInterval), + *legacyFileSDFiles, + time.Duration(*legacyFileSDInterval), + *endpoints, + *endpointGroups, + *strictEndpoints, + *strictEndpointGroups, + *dnsSDResolver, + time.Duration(*dnsSDInterval), + time.Duration(*unhealthyStoreTimeout), + time.Duration(*endpointInfoTimeout), + dialOpts, + *queryConnMetricLabels..., + ) + if err != nil { + return err + } + return runQuery( g, logger, debugLogging, + endpointSet, reg, tracer, httpLogOpts, grpcLogOpts, logFilterMethods, grpcServerConfig, - *grpcCompression, - *secure, - *skipVerify, - *cert, - *key, - *caCert, - *serverName, *httpBindAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), @@ -326,18 +301,10 @@ func registerQuery(app *extkingpin.App) { *dynamicLookbackDelta, time.Duration(*defaultEvaluationInterval), time.Duration(*storeResponseTimeout), - *queryConnMetricLabels, *queryReplicaLabels, *queryPartitionLabels, selectorLset, getFlagsMap(cmd.Flags()), - *endpoints, - *endpointGroups, - *stores, - *ruleEndpoints, - *targetEndpoints, - *metadataEndpoints, - *exemplarEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, @@ -345,16 +312,8 @@ func registerQuery(app *extkingpin.App) { *enableMetricMetadataPartialResponse, *enableExemplarPartialResponse, *activeQueryDir, - fileSD, - time.Duration(*dnsSDInterval), - *dnsSDResolver, - time.Duration(*unhealthyStoreTimeout), - time.Duration(*endpointInfoTimeout), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, - *strictStores, - *strictEndpoints, - *strictEndpointGroups, *webDisableCORS, *alertQueryURL, *grpcProxyStrategy, @@ -381,19 +340,13 @@ func runQuery( g *run.Group, logger log.Logger, debugLogging bool, + endpointSet *query.EndpointSet, reg *prometheus.Registry, tracer opentracing.Tracer, httpLogOpts []logging.Option, grpcLogOpts []grpc_logging.Option, logFilterMethods []string, grpcServerConfig grpcConfig, - grpcCompression string, - secure bool, - skipVerify bool, - cert string, - key string, - caCert string, - serverName string, httpBindAddr string, httpTLSConfig string, httpGracePeriod time.Duration, @@ -408,18 +361,10 @@ func runQuery( dynamicLookbackDelta bool, defaultEvaluationInterval time.Duration, storeResponseTimeout time.Duration, - queryConnMetricLabels []string, queryReplicaLabels []string, queryPartitionLabels []string, selectorLset labels.Labels, flagsMap map[string]string, - endpointAddrs []string, - endpointGroupAddrs []string, - storeAddrs []string, - ruleAddrs []string, - targetAddrs []string, - metadataAddrs []string, - exemplarAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, @@ -427,16 +372,8 @@ func runQuery( enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, activeQueryDir string, - fileSD *file.Discovery, - dnsSDInterval time.Duration, - dnsSDResolver string, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, - strictStores []string, - strictEndpoints []string, - strictEndpointGroups []string, disableCORS bool, alertQueryURL string, grpcProxyStrategy string, @@ -462,79 +399,6 @@ func runQuery( } // NOTE(GiedriusS): default is set in config.ts. } - // TODO(bplotka in PR #513 review): Move arguments into struct. - duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_query_duplicated_store_addresses_total", - Help: "The number of times a duplicated store addresses is detected from the different configs in query", - }) - - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) - if err != nil { - return errors.Wrap(err, "building gRPC client") - } - if grpcCompression != compressionNone { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression))) - } - - fileSDCache := cache.New() - dnsStoreProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_store_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - for _, store := range strictStores { - if dns.IsDynamicNode(store) { - return errors.Errorf("%s is a dynamically specified store i.e. it uses SD and that is not permitted under strict mode. Use --store for this", store) - } - } - - for _, endpoint := range strictEndpoints { - if dns.IsDynamicNode(endpoint) { - return errors.Errorf("%s is a dynamically specified endpoint i.e. it uses SD and that is not permitted under strict mode. Use --endpoint for this", endpoint) - } - } - - // Register resolver for the "thanos:///" scheme for endpoint-groups - dns.RegisterGRPCResolver( - dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoint_groups_", reg), - dns.ResolverType(dnsSDResolver), - ), - dnsSDInterval, - logger, - ) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_endpoints_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsRuleProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_rule_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsTargetProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_target_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsMetadataProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) - - dnsExemplarProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_query_exemplar_apis_", reg), - dns.ResolverType(dnsSDResolver), - ) options := []store.ProxyStoreOption{ store.WithTSDBSelector(tsdbSelector), @@ -545,35 +409,12 @@ func runQuery( queryReplicaLabels = strutil.ParseFlagLabels(queryReplicaLabels) var ( - endpoints = prepareEndpointSet( - g, - logger, - reg, - []*dns.Provider{ - dnsStoreProvider, - dnsRuleProvider, - dnsExemplarProvider, - dnsMetadataProvider, - dnsTargetProvider, - dnsEndpointProvider, - }, - duplicatedStores, - strictStores, - strictEndpoints, - endpointGroupAddrs, - strictEndpointGroups, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - proxyStore = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) + proxyStore = store.NewProxyStore(logger, reg, endpointSet.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...) seriesProxy = store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxyStore), reg, storeRateLimits) - rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients) - targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients) - metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients) - exemplarsProxy = exemplars.NewProxy(logger, endpoints.GetExemplarsStores, selectorLset) + rulesProxy = rules.NewProxy(logger, endpointSet.GetRulesClients) + targetsProxy = targets.NewProxy(logger, endpointSet.GetTargetsClients) + metadataProxy = metadata.NewProxy(logger, endpointSet.GetMetricMetadataClients) + exemplarsProxy = exemplars.NewProxy(logger, endpointSet.GetExemplarsStores, selectorLset) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -583,78 +424,6 @@ func runQuery( ) ) - // Run File Service Discovery and update the store set when the files are modified. - if fileSD != nil { - var fileSDUpdates chan []*targetgroup.Group - ctxRun, cancelRun := context.WithCancel(context.Background()) - - fileSDUpdates = make(chan []*targetgroup.Group) - - g.Add(func() error { - fileSD.Run(ctxRun, fileSDUpdates) - return nil - }, func(error) { - cancelRun() - }) - - ctxUpdate, cancelUpdate := context.WithCancel(context.Background()) - g.Add(func() error { - for { - select { - case update := <-fileSDUpdates: - // Discoverers sometimes send nil updates so need to check for it to avoid panics. - if update == nil { - continue - } - fileSDCache.Update(update) - endpoints.Update(ctxUpdate) - - if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - - // Rules apis do not support file service discovery as of now. - case <-ctxUpdate.Done(): - return nil - } - } - }, func(error) { - cancelUpdate() - }) - } - // Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, dnsSDInterval) - defer resolveCancel() - if err := dnsStoreProvider.Resolve(resolveCtx, append(fileSDCache.Addresses(), storeAddrs...), true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) - } - if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) - } - if err := dnsTargetProvider.Resolve(ctx, targetAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for targetsAPIs", "err", err) - } - if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) - } - if err := dnsExemplarProvider.Resolve(resolveCtx, exemplarAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for exemplarsAPI", "err", err) - } - if err := dnsEndpointProvider.Resolve(resolveCtx, endpointAddrs, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using endpoint flag", "err", err) - - } - return nil - }) - }, func(error) { - cancel() - }) - } - grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -687,7 +456,7 @@ func runQuery( if queryMode != queryModeLocal { level.Info(logger).Log("msg", "Distributed query mode enabled, using Thanos as the default query engine.") defaultEngine = string(apiv1.PromqlEngineThanos) - remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ + remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpointSet.GetQueryAPIClients, query.Opts{ AutoDownsample: enableAutodownsampling, ReplicaLabels: queryReplicaLabels, PartitionLabels: queryPartitionLabels, @@ -727,11 +496,11 @@ func runQuery( ins := extpromhttp.NewTenantInstrumentationMiddleware(tenantHeader, defaultTenant, reg, nil) // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. - ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) + ui.NewQueryUI(logger, endpointSet, webExternalPrefix, webPrefixHeaderName, alertQueryURL, tenantHeader, defaultTenant, enforceTenancy).Register(router, ins) api := apiv1.NewQueryAPI( logger, - endpoints.GetEndpointStatus, + endpointSet.GetEndpointStatus, engineFactory, apiv1.PromqlEngineType(defaultEngine), lookbackDeltaCreator, @@ -844,7 +613,7 @@ func runQuery( }, func(error) { statusProber.NotReady(err) s.Shutdown(err) - endpoints.Close() + endpointSet.Close() }) } @@ -852,96 +621,6 @@ func runQuery( return nil } -func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []*query.GRPCEndpointSpec) []*query.GRPCEndpointSpec { - set := make(map[string]*query.GRPCEndpointSpec) - for _, spec := range specs { - addr := spec.Addr() - if _, ok := set[addr]; ok { - level.Warn(logger).Log("msg", "Duplicate store address is provided", "addr", addr) - duplicatedStores.Inc() - } - set[addr] = spec - } - deduplicated := make([]*query.GRPCEndpointSpec, 0, len(set)) - for _, value := range set { - deduplicated = append(deduplicated, value) - } - return deduplicated -} - -func prepareEndpointSet( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - dnsProviders []*dns.Provider, - duplicatedStores prometheus.Counter, - strictStores []string, - strictEndpoints []string, - endpointGroupAddrs []string, - strictEndpointGroups []string, - dialOpts []grpc.DialOption, - unhealthyStoreTimeout time.Duration, - endpointInfoTimeout time.Duration, - queryConnMetricLabels ...string, -) *query.EndpointSet { - endpointSet := query.NewEndpointSet( - time.Now, - logger, - reg, - func() (specs []*query.GRPCEndpointSpec) { - // Add strict & static nodes. - for _, addr := range strictStores { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, addr := range strictEndpoints { - specs = append(specs, query.NewGRPCEndpointSpec(addr, true)) - } - - for _, dnsProvider := range dnsProviders { - var tmpSpecs []*query.GRPCEndpointSpec - - for _, addr := range dnsProvider.Addresses() { - tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false)) - } - tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs) - specs = append(specs, tmpSpecs...) - } - - for _, eg := range endpointGroupAddrs { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), false, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - for _, eg := range strictEndpointGroups { - spec := query.NewGRPCEndpointSpec(fmt.Sprintf("thanos:///%s", eg), true, extgrpc.EndpointGroupGRPCOpts()...) - specs = append(specs, spec) - } - - return specs - }, - dialOpts, - unhealthyStoreTimeout, - endpointInfoTimeout, - queryConnMetricLabels..., - ) - - // Periodically update the store set with the addresses we see in our cluster. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - endpointSet.Update(ctx) - return nil - }) - }, func(error) { - cancel() - }) - } - - return endpointSet -} - // LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on // dynamicLookbackDelta and eo.LookbackDelta and returns a function // that returns appropriate lookback delta for given maxSourceResolutionMillis. diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e0780452fd..7088e17857 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -79,8 +79,6 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -const dnsSDResolver = "miekgdns" - type ruleConfig struct { http httpConfig grpc grpcConfig @@ -404,17 +402,6 @@ func runRule( } if len(grpcEndpoints) > 0 { - duplicatedGRPCEndpoints := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_rule_grpc_endpoints_duplicated_total", - Help: "The number of times a duplicated grpc endpoint is detected from the different configs in rule", - }) - - dnsEndpointProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_rule_grpc_endpoints_", reg), - dnsSDResolver, - ) - dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -430,36 +417,27 @@ func runRule( return err } - grpcEndpointSet = prepareEndpointSet( + grpcEndpointSet, err = setupEndpointset( g, - logger, + comp, reg, - []*dns.Provider{dnsEndpointProvider}, - duplicatedGRPCEndpoints, + logger, nil, + 1*time.Minute, nil, + 1*time.Minute, + grpcEndpoints, nil, nil, - dialOpts, + nil, + conf.query.dnsSDResolver, + conf.query.dnsSDInterval, 5*time.Minute, 5*time.Second, + dialOpts, ) - - // Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary. - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { - resolveCtx, resolveCancel := context.WithTimeout(ctx, 5*time.Second) - defer resolveCancel() - if err := dnsEndpointProvider.Resolve(resolveCtx, grpcEndpoints, true); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses passed using grpc query config", "err", err) - } - return nil - }) - }, func(error) { - cancel() - }) + if err != nil { + return err } } diff --git a/docs/components/query.md b/docs/components/query.md index 10975138b6..8e9a9454cd 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -299,27 +299,37 @@ Flags: detected maximum container or system memory. --enable-auto-gomemlimit Enable go runtime to automatically limit memory consumption. - --endpoint= ... Addresses of statically configured Thanos - API servers (repeatable). The scheme may be - prefixed with 'dns+' or 'dnssrv+' to detect - Thanos API servers through respective DNS - lookups. + --endpoint= ... (Deprecated): Addresses of statically + configured Thanos API servers (repeatable). + The scheme may be prefixed with 'dns+' or + 'dnssrv+' to detect Thanos API servers through + respective DNS lookups. --endpoint-group= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable). Targets - resolved from the DNS name will be queried in - a round-robin, instead of a fanout manner. - This flag should be used when connecting a - Thanos Query to HA groups of Thanos components. + (Deprecated, Experimental): DNS name of + statically configured Thanos API server groups + (repeatable). Targets resolved from the DNS + name will be queried in a round-robin, instead + of a fanout manner. This flag should be used + when connecting a Thanos Query to HA groups of + Thanos components. --endpoint-group-strict= ... - Experimental: DNS name of statically configured - Thanos API server groups (repeatable) that are - always used, even if the health check fails. - --endpoint-strict= ... - Addresses of only statically configured Thanos - API servers that are always used, even if - the health check fails. Useful if you have a - caching layer on top. + (Deprecated, Experimental): DNS name of + statically configured Thanos API server groups + (repeatable) that are always used, even if the + health check fails. + --endpoint-strict= ... + (Deprecated): Addresses of only statically + configured Thanos API servers that are always + used, even if the health check fails. Useful if + you have a caching layer on top. + --endpoint.sd-config= + Alternative to 'endpoint.sd-config-file' flag + (mutually exclusive). Content of Config File + with endpoint definitions + --endpoint.sd-config-file= + Path to Config File with endpoint definitions + --endpoint.sd-config-reload-interval=5m + Interval between endpoint config refreshes --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -500,19 +510,6 @@ Flags: It follows the Thanos sharding relabel-config syntax. For format details see: https://thanos.io/tip/thanos/sharding.md/#relabelling - --store= ... Deprecation Warning - This flag is deprecated - and replaced with `endpoint`. Addresses of - statically configured store API servers - (repeatable). The scheme may be prefixed with - 'dns+' or 'dnssrv+' to detect store API servers - through respective DNS lookups. - --store-strict= ... - Deprecation Warning - This flag is deprecated - and replaced with `endpoint-strict`. Addresses - of only statically configured store API servers - that are always used, even if the health check - fails. Useful if you have a caching layer on - top. --store.limits.request-samples=0 The maximum samples allowed for a single Series request, The Series call fails if @@ -532,11 +529,11 @@ Flags: --store.sd-dns-interval=30s Interval between DNS resolutions. --store.sd-files= ... - Path to files that contain addresses of store - API servers. The path can be a glob pattern - (repeatable). - --store.sd-interval=5m Refresh interval to re-read file SD files. - It is used as a resync fallback. + (Deprecated) Path to files that contain + addresses of store API servers. The path can be + a glob pattern (repeatable). + --store.sd-interval=5m (Deprecated) Refresh interval to re-read file + SD files. It is used as a resync fallback. --store.unhealthy-timeout=5m Timeout before an unhealthy store is cleaned from the store UI page. diff --git a/pkg/discovery/dns/grpc.go b/pkg/discovery/dns/grpc.go index 79e832b652..7971e7991c 100644 --- a/pkg/discovery/dns/grpc.go +++ b/pkg/discovery/dns/grpc.go @@ -23,7 +23,7 @@ type builder struct { logger log.Logger } -func RegisterGRPCResolver(provider *Provider, interval time.Duration, logger log.Logger) { +func RegisterGRPCResolver(logger log.Logger, provider *Provider, interval time.Duration) { grpcresolver.Register(&builder{ resolveInterval: interval, provider: provider, diff --git a/pkg/extkingpin/flags.go b/pkg/extkingpin/flags.go index 62b9142beb..033769c56e 100644 --- a/pkg/extkingpin/flags.go +++ b/pkg/extkingpin/flags.go @@ -47,10 +47,8 @@ func Addrs(flags *kingpin.FlagClause) (target *addressSlice) { return } -// validateAddrs checks an address slice for duplicates and empty or invalid elements. +// validateAddrs checks an address slice for empty or invalid elements. func validateAddrs(addrs addressSlice) error { - set := map[string]struct{}{} - for _, addr := range addrs { if addr == "" { return errors.New("Address is empty.") @@ -61,12 +59,6 @@ func validateAddrs(addrs addressSlice) error { if len(qtypeAndName) != 2 && len(hostAndPort) != 2 { return errors.Errorf("Address %s is not of : format or a valid DNS query.", addr) } - - if _, ok := set[addr]; ok { - return errors.Errorf("Address %s is duplicated.", addr) - } - - set[addr] = struct{}{} } return nil diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 4c519bf925..071e04a846 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -211,8 +211,7 @@ type EndpointSet struct { // Endpoint specifications can change dynamically. If some component is missing from the list, we assume it is no longer // accessible and we close gRPC client for it, unless it is strict. - endpointSpec func() map[string]*GRPCEndpointSpec - dialOpts []grpc.DialOption + endpointSpecs func() map[string]*GRPCEndpointSpec endpointInfoTimeout time.Duration unhealthyEndpointTimeout time.Duration @@ -235,7 +234,6 @@ func NewEndpointSet( logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, - dialOpts []grpc.DialOption, unhealthyEndpointTimeout time.Duration, endpointInfoTimeout time.Duration, endpointMetricLabels ...string, @@ -254,19 +252,17 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, - logger: log.With(logger, "component", "endpointset"), - endpointsMetric: endpointsMetric, - - dialOpts: dialOpts, + now: now, + logger: log.With(logger, "component", "endpointset"), + endpointsMetric: endpointsMetric, endpointInfoTimeout: endpointInfoTimeout, unhealthyEndpointTimeout: unhealthyEndpointTimeout, - endpointSpec: func() map[string]*GRPCEndpointSpec { - specs := make(map[string]*GRPCEndpointSpec) + endpointSpecs: func() map[string]*GRPCEndpointSpec { + res := make(map[string]*GRPCEndpointSpec) for _, s := range endpointSpecs() { - specs[s.addr] = s + res[s.addr] = s } - return specs + return res }, endpoints: make(map[string]*endpointRef), } @@ -288,7 +284,7 @@ func (e *EndpointSet) Update(ctx context.Context) { mu sync.Mutex ) - for _, spec := range e.endpointSpec() { + for _, spec := range e.endpointSpecs() { spec := spec if er, existingRef := e.endpoints[spec.Addr()]; existingRef { @@ -571,11 +567,7 @@ type endpointRef struct { // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { - var dialOpts []grpc.DialOption - - dialOpts = append(dialOpts, e.dialOpts...) - dialOpts = append(dialOpts, spec.dialOpts...) - conn, err := grpc.NewClient(spec.Addr(), dialOpts...) + conn, err := grpc.NewClient(spec.Addr(), spec.dialOpts...) if err != nil { return nil, errors.Wrap(err, "dialing connection") } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 6f061211ab..68b00aff5e 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -675,11 +675,11 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { endpointSet := NewEndpointSet(nowFunc, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, false)) + specs = append(specs, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Initial update. @@ -1052,7 +1052,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { } return specs }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() // Should not matter how many of these we run. @@ -1159,11 +1159,11 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { slowStaticEndpointAddr := discoveredEndpointAddr[2] endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), - NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), - NewGRPCEndpointSpec(discoveredEndpointAddr[2], true), + NewGRPCEndpointSpec(discoveredEndpointAddr[0], true, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[1], false, testGRPCOpts...), + NewGRPCEndpointSpec(discoveredEndpointAddr[2], true, testGRPCOpts...), } - }, testGRPCOpts, time.Minute, 1*time.Second) + }, time.Minute, 1*time.Second) defer endpointSet.Close() // Initial update. @@ -1273,7 +1273,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { endpointSpec: func() []*GRPCEndpointSpec { endpointSpec := make([]*GRPCEndpointSpec, 0, len(endpoints.orderAddrs)) for _, addr := range endpoints.orderAddrs { - endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false)) + endpointSpec = append(endpointSpec, NewGRPCEndpointSpec(addr, false, testGRPCOpts...)) } return endpointSpec }, @@ -1297,7 +1297,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar discovered, no Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), } }, expectedStores: 1, // sidecar @@ -1310,8 +1310,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Ruler discovered", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[0], false), - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[0], false, testGRPCOpts...), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 2, // sidecar + ruler @@ -1324,7 +1324,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { name: "Sidecar removed", endpointSpec: func() []*GRPCEndpointSpec { return []*GRPCEndpointSpec{ - NewGRPCEndpointSpec(endpoints.orderAddrs[1], false), + NewGRPCEndpointSpec(endpoints.orderAddrs[1], false, testGRPCOpts...), } }, expectedStores: 1, // ruler @@ -1344,7 +1344,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { return tc.states[currentState].endpointSpec() }, - testGRPCOpts, time.Minute, 2*time.Second) + time.Minute, 2*time.Second) defer endpointSet.Close() @@ -1532,11 +1532,11 @@ func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, endpointSet := NewEndpointSet(now, nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { - specs = append(specs, NewGRPCEndpointSpec(addr, strict)) + specs = append(specs, NewGRPCEndpointSpec(addr, strict, testGRPCOpts...)) } return specs }, - testGRPCOpts, time.Minute, time.Second, metricLabels...) + time.Minute, time.Second, metricLabels...) return endpointSet } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62..a25f8f6ab9 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,9 +17,7 @@ import ( e2edb "github.com/efficientgo/e2e/db" e2eobs "github.com/efficientgo/e2e/observable" "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/relabel" "gopkg.in/yaml.v2" @@ -429,26 +427,25 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { "--store.sd-dns-interval": "5s", "--log.level": infoLogLevel, "--query.max-concurrent": "1", - "--store.sd-interval": "5s", }) for _, repl := range q.replicaLabels { args = append(args, "--query.replica-label="+repl) } for _, addr := range q.storeAddresses { - args = append(args, "--store="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.ruleAddresses { - args = append(args, "--rule="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.targetAddresses { - args = append(args, "--target="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.metadataAddresses { - args = append(args, "--metadata="+addr) + args = append(args, "--endpoint="+addr) } for _, addr := range q.exemplarAddresses { - args = append(args, "--exemplar="+addr) + args = append(args, "--endpoint="+addr) } for _, feature := range q.enableFeatures { args = append(args, "--enable-feature="+feature) @@ -470,21 +467,27 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return nil, errors.Wrap(err, "create query dir failed") } - fileSD := []*targetgroup.Group{{}} + type EndpointSpec struct{ Address string } + + endpoints := make([]EndpointSpec, 0) for _, a := range q.fileSDStoreAddresses { - fileSD[0].Targets = append(fileSD[0].Targets, model.LabelSet{model.AddressLabel: model.LabelValue(a)}) + endpoints = append(endpoints, EndpointSpec{Address: a}) } - b, err := yaml.Marshal(fileSD) + endpointSDConfig := struct { + Endpoints []EndpointSpec `yaml:"endpoints"` + }{Endpoints: endpoints} + b, err := yaml.Marshal(endpointSDConfig) if err != nil { return nil, err } - if err := os.WriteFile(q.Dir()+"/filesd.yaml", b, 0600); err != nil { + if err := os.WriteFile(q.Dir()+"/endpoint-sd-config.yaml", b, 0600); err != nil { return nil, errors.Wrap(err, "creating query SD config failed") } - args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml")) + args = append(args, "--endpoint.sd-config-file="+filepath.Join(q.InternalDir(), "endpoint-sd-config.yaml")) + args = append(args, "--endpoint.sd-config-reload-interval=5s") } if q.routePrefix != "" { args = append(args, "--web.route-prefix="+q.routePrefix) diff --git a/test/e2e/exemplars_api_test.go b/test/e2e/exemplars_api_test.go index 60c423f667..4c269a3691 100644 --- a/test/e2e/exemplars_api_test.go +++ b/test/e2e/exemplars_api_test.go @@ -82,7 +82,7 @@ config: t.Cleanup(cancel) testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_exemplar_apis_dns_provider_results"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_endpoints_dns_provider_results"}, e2emon.WaitMissingMetrics())) now := time.Now() start := timestamp.FromTime(now.Add(-time.Hour)) diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go index f966379790..df124b9365 100644 --- a/test/e2e/metadata_api_test.go +++ b/test/e2e/metadata_api_test.go @@ -56,7 +56,7 @@ func TestMetadataAPI_Fanout(t *testing.T) { t.Cleanup(cancel) testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) - testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_query_endpoints_dns_provider_results"}, e2emon.WaitMissingMetrics())) var promMeta map[string][]metadatapb.Meta // Wait metadata response to be ready as Prometheus gets metadata after scrape.