Skip to content

Commit

Permalink
receive: Add ability to consume contents of the hashrings file direct…
Browse files Browse the repository at this point in the history
…ly (#3121)

* Add ability to consume content of the hashring directly

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove file format

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Address review issues

Simplify hasring configuration

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored Dec 28, 2020
1 parent 0eb4800 commit dd670a3
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 74 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.
- [#3509](https://github.com/thanos-io/thanos/pull/3509) Store: Added touch series limit
- [#3388](https://github.com/thanos-io/thanos/pull/3378) Tools: Bucket replicator now can specify block IDs to copy.
- [#3121](https://github.com/thanos-io/thanos/pull/3121) Receive: Added `--receive.hashrings` alternative to `receive.hashrings-file` flag (lower priority). Content of JSON file that contains the hashring configuration.

### Fixed

Expand Down Expand Up @@ -111,8 +112,8 @@ Highlights:

### Added

- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memacached cache.
- **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`.
- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memcached cache.
- **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`.
- [#3166](https://github.com/thanos-io/thanos/pull/3166) UIs: Added UI for passing a `storeMatch[]` parameter to queries.
- [#3181](https://github.com/thanos-io/thanos/pull/3181) Logging: Added debug level logging for responses between 300-399
- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allowed passing a `storeMatch[]` to Labels APIs; Time range metadata based store filtering is supported on Labels APIs.
Expand All @@ -134,7 +135,7 @@ Highlights:
- [#3022](https://github.com/thanos-io/thanos/pull/3022) \*: Thanos images are now build with Go 1.15.
- [#3205](https://github.com/thanos-io/thanos/pull/3205) \*: Updated TSDB to ~2.21

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07
## [v0.15.0](https://github.com/thanos-io/thanos/releases/v0.15.0) - 2020.09.07

Highlights:

Expand Down
49 changes: 34 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -63,8 +65,8 @@ func registerReceive(app *extkingpin.App) {

retention := extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d"))

hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration.").
PlaceHolder("<path>").String()
hashringsFilePath := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("<path>").String()
hashringsFileContent := cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").String()

refreshInterval := extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -105,14 +107,6 @@ func registerReceive(app *extkingpin.App) {
return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
if err != nil {
return err
}
}

tsdbOpts := &tsdb.Options{
MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond),
Expand Down Expand Up @@ -158,7 +152,9 @@ func registerReceive(app *extkingpin.App) {
tsdbOpts,
*ignoreBlockSize,
lset,
cw,
*hashringsFilePath,
*hashringsFileContent,
refreshInterval,
*localEndpoint,
*tenantHeader,
*defaultTenantID,
Expand Down Expand Up @@ -197,7 +193,9 @@ func runReceive(
tsdbOpts *tsdb.Options,
ignoreBlockSize bool,
lset labels.Labels,
cw *receive.ConfigWatcher,
hashringsFilePath string,
hashringsFileContent string,
refreshInterval *model.Duration,
endpoint string,
tenantHeader string,
defaultTenantID string,
Expand Down Expand Up @@ -373,7 +371,13 @@ func runReceive(
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)

if cw != nil {
// The Hashrings config file path is given initializing config watcher.
if hashringsFilePath != "" {
cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, hashringsFilePath, *refreshInterval)
if err != nil {
return errors.Wrap(err, "failed to initialize config watcher")
}

// Check the hashring configuration on before running the watcher.
if err := cw.ValidateConfig(); err != nil {
cw.Stop()
Expand All @@ -383,15 +387,30 @@ func runReceive(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return receive.HashringFromConfig(ctx, updates, cw)
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
}, func(error) {
cancel()
})
} else {
var ring receive.Hashring
// The Hashrings config file content given initialize configuration from content.
if len(hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.")
ring = receive.SingleNodeHashring(endpoint)
}

cancel := make(chan struct{})
g.Add(func() error {
defer close(updates)
updates <- receive.SingleNodeHashring(endpoint)
updates <- ring
<-cancel
return nil
}, func(error) {
Expand Down
8 changes: 7 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ Flags:
storage. 0d - disables this retention.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration.
configuration. A watcher is initialized to
watch changes and update the hashring
dynamically.
--receive.hashrings=<content>
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-file-refresh-interval=5m
Refresh interval to re-read the hashring
configuration file. (used as a fallback)
Expand Down
10 changes: 5 additions & 5 deletions pkg/extflag/pathorcontent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,29 @@ func RegisterPathOrContent(cmd FlagClause, flagName string, help string, require
}
}

// Content returns content of the file. Flag that specifies path has priority.
// Content returns the content of the file when given or directly the content that has passed to the flag.
// Flag that specifies path has priority.
// It returns error if the content is empty and required flag is set to true.
func (p *PathOrContent) Content() ([]byte, error) {
contentFlagName := p.flagName
fileFlagName := fmt.Sprintf("%s-file", p.flagName)

if len(*p.path) > 0 && len(*p.content) > 0 {
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, contentFlagName)
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName)
}

var content []byte
if len(*p.path) > 0 {
c, err := ioutil.ReadFile(*p.path)
if err != nil {
return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, fileFlagName)
return nil, errors.Wrapf(err, "loading file %s for %s", *p.path, fileFlagName)
}
content = c
} else {
content = []byte(*p.content)
}

if len(content) == 0 && p.required {
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, contentFlagName)
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName)
}

return content, nil
Expand Down
82 changes: 41 additions & 41 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,42 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig {

// ValidateConfig returns an error if the configuration that's being watched is not valid.
func (cw *ConfigWatcher) ValidateConfig() error {
_, _, err := cw.loadConfig()
_, _, err := loadConfig(cw.logger, cw.path)
return err
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig() ([]HashringConfig, float64, error) {
cfgContent, err := cw.readFile()
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}
// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)

config, err := cw.parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
done := make(chan struct{})
defer close(done)

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", cw.path)
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
}

return config, hashAsMetricValue(cfgContent), nil
close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()

config, cfgHash, err := cw.loadConfig()
config, cfgHash, err := loadConfig(cw.logger, cw.path)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
Expand Down Expand Up @@ -238,50 +245,43 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
}
}

// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)
// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

done := make(chan struct{})
defer close(done)
config, err := parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}

// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path)
}

close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
return config, hashAsMetricValue(cfgContent), nil
}

// readFile reads the configuration file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
func readFile(logger log.Logger, path string) ([]byte, error) {
fd, err := os.Open(path)
if err != nil {
return nil, err
}
defer func() {
if err := fd.Close(); err != nil {
level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path)
level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path)
}
}()

return ioutil.ReadAll(fd)
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func (cw *ConfigWatcher) parseConfig(content []byte) ([]HashringConfig, error) {
func parseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
Expand Down
20 changes: 18 additions & 2 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cespare/xxhash"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -158,14 +159,14 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
return m
}

// HashringFromConfig creates multi-tenant hashrings from a
// HashringFromConfigWatcher creates multi-tenant hashrings from a
// hashring configuration file watcher.
// The configuration file is watched for updates.
// Hashrings are returned on the updates channel.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -181,3 +182,18 @@ func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *Config
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
}
5 changes: 3 additions & 2 deletions scripts/cfggen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/cacheutil"
http_util "github.com/thanos-io/thanos/pkg/http"
Expand All @@ -34,8 +37,6 @@ import (
"github.com/thanos-io/thanos/pkg/tracing/jaeger"
"github.com/thanos-io/thanos/pkg/tracing/lightstep"
"github.com/thanos-io/thanos/pkg/tracing/stackdriver"
kingpin "gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

var (
Expand Down
6 changes: 1 addition & 5 deletions scripts/quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ sleep 0.5

if [ -n "${REMOTE_WRITE_ENABLED}" ]; then

cat <<-EOF >./data/hashring.json
[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]
EOF

for i in $(seq 0 1 2); do
${THANOS_EXECUTABLE} receive \
--debug.name receive${i} \
Expand All @@ -203,7 +199,7 @@ if [ -n "${REMOTE_WRITE_ENABLED}" ]; then
--label "receive_replica=\"${i}\"" \
--label 'receive="true"' \
--receive.local-endpoint 127.0.0.1:1${i}907 \
--receive.hashrings-file ./data/hashring.json \
--receive.hashrings '[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]' \
--remote-write.address 0.0.0.0:1${i}908 \
${OBJSTORECFG} &

Expand Down
Loading

0 comments on commit dd670a3

Please sign in to comment.