diff --git a/CHANGELOG.md b/CHANGELOG.md index 891d3c8e01e..ec9b9342d08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +- [#3121](https://github.com/thanos-io/thanos/pull/3121) Receive: Added `--receive.hashrings` alternative to `receive.hashrings-file` flag (lower priority). Content of JSON file that contains the hashring configuration. + ## [v0.15.0](https://github.com/thanos-io/thanos/releases) - in release process. :warning: **WARNING** :warning: Thanos Rule's `/api/v1/rules` endpoint no longer returns the old, deprecated `partial_response_strategy`. The old, deprecated value has been fixed to `WARN` for quite some time. _Please_ use `partialResponseStrategy`. @@ -24,7 +26,6 @@ sse_config: type: SSE-S3 ``` - ### Fixed - [#2937](https://github.com/thanos-io/thanos/pull/2937) Receive: Fixing auto-configuration of --receive.local-endpoint diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index dcee94f15e0..aeadae50feb 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -19,8 +19,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/component" @@ -63,8 +65,7 @@ func registerReceive(app *extkingpin.App) { retention := modelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d")) - hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration."). - PlaceHolder("").String() + hashringsFile := extflag.RegisterPathOrContent(cmd, "receive.hashrings", "JSON file that contains the hashring configuration.", false) refreshInterval := modelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) @@ -101,14 +102,6 @@ func registerReceive(app *extkingpin.App) { return errors.Wrap(err, "parse labels") } - var cw *receive.ConfigWatcher - if *hashringsFile != "" { - cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval) - if err != nil { - return err - } - } - tsdbOpts := &tsdb.Options{ MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond), @@ -154,7 +147,8 @@ func registerReceive(app *extkingpin.App) { tsdbOpts, *ignoreBlockSize, lset, - cw, + hashringsFile, + refreshInterval, *localEndpoint, *tenantHeader, *defaultTenantID, @@ -193,7 +187,8 @@ func runReceive( tsdbOpts *tsdb.Options, ignoreBlockSize bool, lset labels.Labels, - cw *receive.ConfigWatcher, + hashringsFile *extflag.PathOrContent, + refreshInterval *model.Duration, endpoint string, tenantHeader string, defaultTenantID string, @@ -369,7 +364,13 @@ func runReceive( // watcher, we close the chan ourselves. updates := make(chan receive.Hashring, 1) - if cw != nil { + // The Hashrings config file path given initializing config watcher. + if configPath, err := hashringsFile.Path(); err != nil { + cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, configPath, *refreshInterval) + if err != nil { + return errors.Wrap(err, "failed to initialize config watcher") + } + // Check the hashring configuration on before running the watcher. if err := cw.ValidateConfig(); err != nil { cw.Stop() @@ -379,15 +380,33 @@ func runReceive( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return receive.HashringFromConfig(ctx, updates, cw) + return receive.HashringFromConfigWatcher(ctx, updates, cw) }, func(error) { cancel() }) } else { + configContent, err := hashringsFile.Content() + if err != nil { + return errors.Wrap(err, "failed to read hashrings configuration file") + } + + var ring receive.Hashring + // The Hashrings config file content given initialize configuration from content. + if len(configContent) > 0 { + ring, err = receive.HashringFromConfig(configContent) + if err != nil { + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") + } + } else { + // The hashring file is not specified use single node hashring. + ring = receive.SingleNodeHashring(endpoint) + } + cancel := make(chan struct{}) g.Add(func() error { defer close(updates) - updates <- receive.SingleNodeHashring(endpoint) + updates <- ring <-cancel return nil }, func(error) { diff --git a/docs/components/receive.md b/docs/components/receive.md index 2e9a878b73f..d3d887343af 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -133,9 +133,13 @@ Flags: https://thanos.io/tip/thanos/storage.md/#configuration --tsdb.retention=15d How long to retain raw samples on local storage. 0d - disables this retention. - --receive.hashrings-file= - Path to file that contains the hashring + --receive.hashrings-file= + Path to JSON file that contains the hashring configuration. + --receive.hashrings= + Alternative to 'receive.hashrings-file' flag + (lower priority). Content of JSON file that + contains the hashring configuration. --receive.hashrings-file-refresh-interval=5m Refresh interval to re-read the hashring configuration file. (used as a fallback) diff --git a/pkg/extflag/pathorcontent.go b/pkg/extflag/pathorcontent.go index 189c7af9def..d3401c37e6b 100644 --- a/pkg/extflag/pathorcontent.go +++ b/pkg/extflag/pathorcontent.go @@ -44,21 +44,20 @@ func RegisterPathOrContent(cmd FlagClause, flagName string, help string, require } } -// Content returns content of the file. Flag that specifies path has priority. +// Content returns the content of the file. Flag that specifies path has priority. // It returns error if the content is empty and required flag is set to true. func (p *PathOrContent) Content() ([]byte, error) { - contentFlagName := p.flagName fileFlagName := fmt.Sprintf("%s-file", p.flagName) if len(*p.path) > 0 && len(*p.content) > 0 { - return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, contentFlagName) + return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName) } var content []byte if len(*p.path) > 0 { c, err := ioutil.ReadFile(*p.path) if err != nil { - return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, fileFlagName) + return nil, errors.Wrapf(err, "loading file %s for %s", *p.path, fileFlagName) } content = c } else { @@ -66,8 +65,24 @@ func (p *PathOrContent) Content() ([]byte, error) { } if len(content) == 0 && p.required { - return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, contentFlagName) + return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName) } return content, nil } + +// Path returns the path of the file. Flag that specifies path has priority. +// It returns error if the required flag is set to true. +func (p *PathOrContent) Path() (string, error) { + fileFlagName := fmt.Sprintf("%s-file", p.flagName) + + if len(*p.path) > 0 && len(*p.content) > 0 { + return "", errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName) + } + + if len(*p.path) == 0 && p.required { + return "", errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName) + } + + return *p.path, nil +} diff --git a/pkg/receive/config.go b/pkg/receive/config.go index 1eb198cace6..272c3c5321a 100644 --- a/pkg/receive/config.go +++ b/pkg/receive/config.go @@ -176,35 +176,42 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig { // ValidateConfig returns an error if the configuration that's being watched is not valid. func (cw *ConfigWatcher) ValidateConfig() error { - _, _, err := cw.loadConfig() + _, _, err := loadConfig(cw.logger, cw.path) return err } -// loadConfig loads raw configuration content and returns a configuration. -func (cw *ConfigWatcher) loadConfig() ([]HashringConfig, float64, error) { - cfgContent, err := cw.readFile() - if err != nil { - return nil, 0, errors.Wrap(err, "failed to read configuration file") - } +// Stop shuts down the config watcher. +func (cw *ConfigWatcher) Stop() { + level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path) - config, err := cw.parseConfig(cfgContent) - if err != nil { - return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err) - } + done := make(chan struct{}) + defer close(done) - // If hashring is empty, return an error. - if len(config) == 0 { - return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", cw.path) + // Closing the watcher will deadlock unless all events and errors are drained. + go func() { + for { + select { + case <-cw.watcher.Errors: + case <-cw.watcher.Events: + // Drain all events and errors. + case <-done: + return + } + } + }() + if err := cw.watcher.Close(); err != nil { + level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err) } - return config, hashAsMetricValue(cfgContent), nil + close(cw.ch) + level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped") } // refresh reads the configured file and sends the hashring configuration on the channel. func (cw *ConfigWatcher) refresh(ctx context.Context) { cw.refreshCounter.Inc() - config, cfgHash, err := cw.loadConfig() + config, cfgHash, err := loadConfig(cw.logger, cw.path) if err != nil { cw.errorCounter.Inc() level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path) @@ -238,42 +245,35 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) { } } -// Stop shuts down the config watcher. -func (cw *ConfigWatcher) Stop() { - level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path) +// loadConfig loads raw configuration content and returns a configuration. +func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) { + cfgContent, err := readFile(logger, path) + if err != nil { + return nil, 0, errors.Wrap(err, "failed to read configuration file") + } - done := make(chan struct{}) - defer close(done) + config, err := parseConfig(cfgContent) + if err != nil { + return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err) + } - // Closing the watcher will deadlock unless all events and errors are drained. - go func() { - for { - select { - case <-cw.watcher.Errors: - case <-cw.watcher.Events: - // Drain all events and errors. - case <-done: - return - } - } - }() - if err := cw.watcher.Close(); err != nil { - level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err) + // If hashring is empty, return an error. + if len(config) == 0 { + return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path) } - close(cw.ch) - level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped") + return config, hashAsMetricValue(cfgContent), nil } // readFile reads the configuration file and returns content of configuration file. -func (cw *ConfigWatcher) readFile() ([]byte, error) { - fd, err := os.Open(cw.path) +func readFile(logger log.Logger, path string) ([]byte, error) { + fd, err := os.Open(path) if err != nil { return nil, err } defer func() { if err := fd.Close(); err != nil { - level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path) + level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path) } }() @@ -281,7 +281,7 @@ func (cw *ConfigWatcher) readFile() ([]byte, error) { } // parseConfig parses the raw configuration content and returns a HashringConfig. -func (cw *ConfigWatcher) parseConfig(content []byte) ([]HashringConfig, error) { +func parseConfig(content []byte) ([]HashringConfig, error) { var config []HashringConfig err := json.Unmarshal(content, &config) return config, err diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index ef6c94390a9..2574034c277 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -11,6 +11,7 @@ import ( "github.com/cespare/xxhash" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -158,14 +159,14 @@ func newMultiHashring(cfg []HashringConfig) Hashring { return m } -// HashringFromConfig creates multi-tenant hashrings from a +// HashringFromConfigWatcher creates multi-tenant hashrings from a // hashring configuration file watcher. // The configuration file is watched for updates. // Hashrings are returned on the updates channel. // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. // The updates chan is closed before exiting. -func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { +func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error { defer close(updates) go cw.Run(ctx) @@ -181,3 +182,18 @@ func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *Config } } } + +// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid. +func HashringFromConfig(content []byte) (Hashring, error) { + config, err := parseConfig(content) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse configuration") + } + + // If hashring is empty, return an error. + if len(config) == 0 { + return nil, errors.Wrapf(err, "failed to load configuration") + } + + return newMultiHashring(config), err +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 76eeed0ea9a..2574e36c7b5 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -15,6 +15,9 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/yaml.v2" + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/cacheutil" http_util "github.com/thanos-io/thanos/pkg/http" @@ -34,8 +37,6 @@ import ( "github.com/thanos-io/thanos/pkg/tracing/jaeger" "github.com/thanos-io/thanos/pkg/tracing/lightstep" "github.com/thanos-io/thanos/pkg/tracing/stackdriver" - kingpin "gopkg.in/alecthomas/kingpin.v2" - yaml "gopkg.in/yaml.v2" ) var ( @@ -54,6 +55,7 @@ var ( trclient.ELASTIC_APM: elasticapm.Config{}, trclient.LIGHTSTEP: lightstep.Config{}, } + // TODO(kakkoyun): Add Bucket cache. indexCacheConfigs = map[storecache.IndexCacheProvider]interface{}{ storecache.INMEMORY: storecache.InMemoryIndexCacheConfig{}, storecache.MEMCACHED: cacheutil.MemcachedClientConfig{}, diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index f83a88a0ebf..ec7532101bd 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -184,10 +184,6 @@ sleep 0.5 if [ -n "${REMOTE_WRITE_ENABLED}" ]; then - cat <<-EOF >./data/hashring.json - [{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}] - EOF - for i in $(seq 0 1 2); do ${THANOS_EXECUTABLE} receive \ --debug.name receive${i} \ @@ -203,7 +199,7 @@ if [ -n "${REMOTE_WRITE_ENABLED}" ]; then --label "receive_replica=\"${i}\"" \ --label 'receive="true"' \ --receive.local-endpoint 127.0.0.1:1${i}907 \ - --receive.hashrings-file ./data/hashring.json \ + --receive.hashrings '[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]' \ --remote-write.address 0.0.0.0:1${i}908 \ ${OBJSTORECFG} & diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 28bfc780682..60482fcc01b 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -201,6 +201,51 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) } + receiver := NewService( + fmt.Sprintf("receive-%v", name), + DefaultImage(), + // TODO(bwplotka): BuildArgs should be interface. + e2e.NewCommand("receive", e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("receive-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--remote-write.address": ":8081", + "--label": fmt.Sprintf(`receive="%s"`, name), + "--tsdb.path": filepath.Join(container, "data"), + "--log.level": logLevel, + "--receive.replication-factor": strconv.Itoa(replicationFactor), + "--receive.local-endpoint": localEndpoint, + "--receive.hashrings": string(b), + })...), + e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), + 8080, + 9091, + 8081, + ) + receiver.SetUser(strconv.Itoa(os.Getuid())) + receiver.SetBackoff(defaultBackoffConfig) + + return receiver, nil +} + +func NewReceiverWithConfigWatcher(sharedDir string, networkName string, name string, replicationFactor int, hashring ...receive.HashringConfig) (*Service, error) { + localEndpoint := NewService(fmt.Sprintf("receive-%v", name), "", e2e.NewCommand("", ""), nil, 8080, 9091, 8081).GRPCNetworkEndpointFor(networkName) + if len(hashring) == 0 { + hashring = []receive.HashringConfig{{Endpoints: []string{localEndpoint}}} + } + + dir := filepath.Join(sharedDir, "data", "receive", name) + dataDir := filepath.Join(dir, "data") + container := filepath.Join(e2e.ContainerSharedDir, "data", "receive", name) + if err := os.MkdirAll(dataDir, 0777); err != nil { + return nil, errors.Wrap(err, "create receive dir") + } + b, err := json.Marshal(hashring) + if err != nil { + return nil, errors.Wrapf(err, "generate hashring file: %v", hashring) + } + if err := ioutil.WriteFile(filepath.Join(dir, "hashrings.json"), b, 0666); err != nil { return nil, errors.Wrap(err, "creating receive config") } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 49230eabf02..6e899c00006 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -101,6 +101,82 @@ func TestReceive(t *testing.T) { }) }) + t.Run("hashring with config watcher", func(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("e2e_test_receive_hashring") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + r1, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "1", 1) + testutil.Ok(t, err) + r2, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "2", 1) + testutil.Ok(t, err) + r3, err := e2ethanos.NewReceiver(s.SharedDir(), s.NetworkName(), "3", 1) + testutil.Ok(t, err) + + h := receive.HashringConfig{ + Endpoints: []string{ + r1.GRPCNetworkEndpointFor(s.NetworkName()), + r2.GRPCNetworkEndpointFor(s.NetworkName()), + r3.GRPCNetworkEndpointFor(s.NetworkName()), + }, + } + + // Recreate again, but with hashring config. + // TODO(kakkoyun): Update config file and wait config watcher to reconcile hashring. + r1, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "1", 1, h) + testutil.Ok(t, err) + r2, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "2", 1, h) + testutil.Ok(t, err) + r3, err = e2ethanos.NewReceiverWithConfigWatcher(s.SharedDir(), s.NetworkName(), "3", 1, h) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(r1, r2, r3)) + + prom1, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "1", defaultPromConfig("prom1", 0, e2ethanos.RemoteWriteEndpoint(r1.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom2, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "2", defaultPromConfig("prom2", 0, e2ethanos.RemoteWriteEndpoint(r2.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + prom3, _, err := e2ethanos.NewPrometheus(s.SharedDir(), "3", defaultPromConfig("prom3", 0, e2ethanos.RemoteWriteEndpoint(r3.NetworkEndpoint(8081)), ""), e2ethanos.DefaultPrometheusImage()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) + + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, "", "") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "prom1", + "receive": "2", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom2", + "receive": "1", + "replica": "0", + "tenant_id": "default-tenant", + }, + { + "job": "myself", + "prometheus": "prom3", + "receive": "2", + "replica": "0", + "tenant_id": "default-tenant", + }, + }) + }) + t.Run("replication", func(t *testing.T) { t.Parallel()