Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Add ability to consume contents of the hashrings file directly #3121

Merged
merged 5 commits into from
Dec 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.
- [#3388](https://github.com/thanos-io/thanos/pull/3378) Tools: Bucket replicator now can specify block IDs to copy.
- [#3121](https://github.com/thanos-io/thanos/pull/3121) Receive: Added `--receive.hashrings` alternative to `receive.hashrings-file` flag (lower priority). Content of JSON file that contains the hashring configuration.

### Fixed

Expand Down Expand Up @@ -110,8 +111,8 @@ Highlights:

### Added

- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memacached cache.
- **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`.
- [#3114](https://github.com/thanos-io/thanos/pull/3114) Query Frontend: Added support for Memcached cache.
- **breaking** Renamed flag `log_queries_longer_than` to `log-queries-longer-than`.
- [#3166](https://github.com/thanos-io/thanos/pull/3166) UIs: Added UI for passing a `storeMatch[]` parameter to queries.
- [#3181](https://github.com/thanos-io/thanos/pull/3181) Logging: Added debug level logging for responses between 300-399
- [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allowed passing a `storeMatch[]` to Labels APIs; Time range metadata based store filtering is supported on Labels APIs.
Expand All @@ -133,7 +134,7 @@ Highlights:
- [#3022](https://github.com/thanos-io/thanos/pull/3022) \*: Thanos images are now build with Go 1.15.
- [#3205](https://github.com/thanos-io/thanos/pull/3205) \*: Updated TSDB to ~2.21

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - 2020.09.07
## [v0.15.0](https://github.com/thanos-io/thanos/releases/v0.15.0) - 2020.09.07

Highlights:

Expand Down
49 changes: 34 additions & 15 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/extkingpin"

"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -63,8 +65,8 @@ func registerReceive(app *extkingpin.App) {

retention := extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention.").Default("15d"))

hashringsFile := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration.").
PlaceHolder("<path>").String()
hashringsFilePath := cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("<path>").String()
hashringsFileContent := cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").String()

refreshInterval := extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -105,14 +107,6 @@ func registerReceive(app *extkingpin.App) {
return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
if err != nil {
return err
}
}

tsdbOpts := &tsdb.Options{
MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond),
Expand Down Expand Up @@ -158,7 +152,9 @@ func registerReceive(app *extkingpin.App) {
tsdbOpts,
*ignoreBlockSize,
lset,
cw,
*hashringsFilePath,
*hashringsFileContent,
refreshInterval,
*localEndpoint,
*tenantHeader,
*defaultTenantID,
Expand Down Expand Up @@ -197,7 +193,9 @@ func runReceive(
tsdbOpts *tsdb.Options,
ignoreBlockSize bool,
lset labels.Labels,
cw *receive.ConfigWatcher,
hashringsFilePath string,
hashringsFileContent string,
refreshInterval *model.Duration,
endpoint string,
tenantHeader string,
defaultTenantID string,
Expand Down Expand Up @@ -373,7 +371,13 @@ func runReceive(
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)

if cw != nil {
// The Hashrings config file path is given initializing config watcher.
if hashringsFilePath != "" {
cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, hashringsFilePath, *refreshInterval)
if err != nil {
return errors.Wrap(err, "failed to initialize config watcher")
}

// Check the hashring configuration on before running the watcher.
if err := cw.ValidateConfig(); err != nil {
cw.Stop()
Expand All @@ -383,15 +387,30 @@ func runReceive(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return receive.HashringFromConfig(ctx, updates, cw)
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, updates, cw)
}, func(error) {
cancel()
})
} else {
var ring receive.Hashring
// The Hashrings config file content given initialize configuration from content.
if len(hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.")
ring = receive.SingleNodeHashring(endpoint)
}

cancel := make(chan struct{})
g.Add(func() error {
defer close(updates)
updates <- receive.SingleNodeHashring(endpoint)
updates <- ring
<-cancel
return nil
}, func(error) {
Expand Down
8 changes: 7 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ Flags:
storage. 0d - disables this retention.
--receive.hashrings-file=<path>
Path to file that contains the hashring
configuration.
configuration. A watcher is initialized to
watch changes and update the hashring
dynamically.
--receive.hashrings=<content>
Alternative to 'receive.hashrings-file' flag
(lower priority). Content of file that contains
the hashring configuration.
--receive.hashrings-file-refresh-interval=5m
Refresh interval to re-read the hashring
configuration file. (used as a fallback)
Expand Down
10 changes: 5 additions & 5 deletions pkg/extflag/pathorcontent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,29 @@ func RegisterPathOrContent(cmd FlagClause, flagName string, help string, require
}
}

// Content returns content of the file. Flag that specifies path has priority.
// Content returns the content of the file when given or directly the content that has passed to the flag.
// Flag that specifies path has priority.
// It returns error if the content is empty and required flag is set to true.
func (p *PathOrContent) Content() ([]byte, error) {
contentFlagName := p.flagName
fileFlagName := fmt.Sprintf("%s-file", p.flagName)

if len(*p.path) > 0 && len(*p.content) > 0 {
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, contentFlagName)
return nil, errors.Errorf("both %s and %s flags set.", fileFlagName, p.flagName)
}

var content []byte
if len(*p.path) > 0 {
c, err := ioutil.ReadFile(*p.path)
if err != nil {
return nil, errors.Wrapf(err, "loading YAML file %s for %s", *p.path, fileFlagName)
return nil, errors.Wrapf(err, "loading file %s for %s", *p.path, fileFlagName)
}
content = c
} else {
content = []byte(*p.content)
}

if len(content) == 0 && p.required {
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, contentFlagName)
return nil, errors.Errorf("flag %s or %s is required for running this command and content cannot be empty.", fileFlagName, p.flagName)
}

return content, nil
Expand Down
82 changes: 41 additions & 41 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,42 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig {

// ValidateConfig returns an error if the configuration that's being watched is not valid.
func (cw *ConfigWatcher) ValidateConfig() error {
_, _, err := cw.loadConfig()
_, _, err := loadConfig(cw.logger, cw.path)
return err
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig() ([]HashringConfig, float64, error) {
cfgContent, err := cw.readFile()
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}
// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)

config, err := cw.parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
done := make(chan struct{})
defer close(done)

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", cw.path)
// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
}

return config, hashAsMetricValue(cfgContent), nil
close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()

config, cfgHash, err := cw.loadConfig()
config, cfgHash, err := loadConfig(cw.logger, cw.path)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
Expand Down Expand Up @@ -238,50 +245,43 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
}
}

// Stop shuts down the config watcher.
func (cw *ConfigWatcher) Stop() {
level.Debug(cw.logger).Log("msg", "stopping hashring configuration watcher...", "path", cw.path)
// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

done := make(chan struct{})
defer close(done)
config, err := parseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}

// Closing the watcher will deadlock unless all events and errors are drained.
go func() {
for {
select {
case <-cw.watcher.Errors:
case <-cw.watcher.Events:
// Drain all events and errors.
case <-done:
return
}
}
}()
if err := cw.watcher.Close(); err != nil {
level.Error(cw.logger).Log("msg", "error closing file watcher", "path", cw.path, "err", err)
// If hashring is empty, return an error.
if len(config) == 0 {
return nil, 0, errors.Wrapf(errEmptyConfigurationFile, "failed to load configuration file, path: %s", path)
}

close(cw.ch)
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
return config, hashAsMetricValue(cfgContent), nil
}

// readFile reads the configuration file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
func readFile(logger log.Logger, path string) ([]byte, error) {
fd, err := os.Open(path)
if err != nil {
return nil, err
}
defer func() {
if err := fd.Close(); err != nil {
level.Error(cw.logger).Log("msg", "failed to close file", "err", err, "path", cw.path)
level.Error(logger).Log("msg", "failed to close file", "err", err, "path", path)
}
}()

return ioutil.ReadAll(fd)
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func (cw *ConfigWatcher) parseConfig(content []byte) ([]HashringConfig, error) {
func parseConfig(content []byte) ([]HashringConfig, error) {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
Expand Down
20 changes: 18 additions & 2 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cespare/xxhash"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -158,14 +159,14 @@ func newMultiHashring(cfg []HashringConfig) Hashring {
return m
}

// HashringFromConfig creates multi-tenant hashrings from a
// HashringFromConfigWatcher creates multi-tenant hashrings from a
// hashring configuration file watcher.
// The configuration file is watched for updates.
// Hashrings are returned on the updates channel.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
func HashringFromConfigWatcher(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

Expand All @@ -181,3 +182,18 @@ func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *Config
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(config), err
}
5 changes: 3 additions & 2 deletions scripts/cfggen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/cacheutil"
http_util "github.com/thanos-io/thanos/pkg/http"
Expand All @@ -34,8 +37,6 @@ import (
"github.com/thanos-io/thanos/pkg/tracing/jaeger"
"github.com/thanos-io/thanos/pkg/tracing/lightstep"
"github.com/thanos-io/thanos/pkg/tracing/stackdriver"
kingpin "gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

var (
Expand Down
6 changes: 1 addition & 5 deletions scripts/quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ sleep 0.5

if [ -n "${REMOTE_WRITE_ENABLED}" ]; then

cat <<-EOF >./data/hashring.json
[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]
EOF

for i in $(seq 0 1 2); do
${THANOS_EXECUTABLE} receive \
--debug.name receive${i} \
Expand All @@ -203,7 +199,7 @@ if [ -n "${REMOTE_WRITE_ENABLED}" ]; then
--label "receive_replica=\"${i}\"" \
--label 'receive="true"' \
--receive.local-endpoint 127.0.0.1:1${i}907 \
--receive.hashrings-file ./data/hashring.json \
--receive.hashrings '[{"endpoints":["127.0.0.1:10907","127.0.0.1:11907","127.0.0.1:12907"]}]' \
--remote-write.address 0.0.0.0:1${i}908 \
${OBJSTORECFG} &

Expand Down
Loading