From 01b245effaeab0d399711df7b464fd2d095e039a Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 2 Mar 2021 17:10:52 -0500 Subject: [PATCH 1/2] initial commit of a configstore --- pkg/prom/ha/server.go | 3 +- pkg/prom/ha/sharding_test.go | 5 +- .../{ha => instance/configstore}/codec.go | 3 +- .../configstore}/codec_test.go | 2 +- pkg/prom/instance/configstore/errors.go | 27 ++ pkg/prom/instance/configstore/remote.go | 304 ++++++++++++++++++ pkg/prom/instance/configstore/store.go | 40 +++ pkg/prom/instance/configstore/unique.go | 35 ++ 8 files changed, 414 insertions(+), 5 deletions(-) rename pkg/prom/{ha => instance/configstore}/codec.go (96%) rename pkg/prom/{ha => instance/configstore}/codec_test.go (97%) create mode 100644 pkg/prom/instance/configstore/errors.go create mode 100644 pkg/prom/instance/configstore/remote.go create mode 100644 pkg/prom/instance/configstore/store.go create mode 100644 pkg/prom/instance/configstore/unique.go diff --git a/pkg/prom/ha/server.go b/pkg/prom/ha/server.go index ed54f5678353..9584afdd2e65 100644 --- a/pkg/prom/ha/server.go +++ b/pkg/prom/ha/server.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/agent/pkg/agentproto" "github.com/grafana/agent/pkg/prom/ha/client" "github.com/grafana/agent/pkg/prom/instance" + "github.com/grafana/agent/pkg/prom/instance/configstore" flagutil "github.com/grafana/agent/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -115,7 +116,7 @@ func New(reg prometheus.Registerer, cfg Config, globalConfig *config.GlobalConfi kvClient, err := kv.NewClient( cfg.KVStore, - GetCodec(), + configstore.GetCodec(), kv.RegistererWithKVName(reg, "agent_configs"), ) if err != nil { diff --git a/pkg/prom/ha/sharding_test.go b/pkg/prom/ha/sharding_test.go index 1e97f166365a..9faa5e31411b 100644 --- a/pkg/prom/ha/sharding_test.go +++ b/pkg/prom/ha/sharding_test.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/go-kit/kit/log" "github.com/grafana/agent/pkg/agentproto" + "github.com/grafana/agent/pkg/prom/instance/configstore" "github.com/stretchr/testify/require" ) @@ -18,7 +19,7 @@ func TestServer_Reshard(t *testing.T) { // - All configs not in the store but in the existing InstanceManager should be deleted fakeIm := newFakeInstanceManager() - mockKv := consul.NewInMemoryClient(GetCodec()) + mockKv := consul.NewInMemoryClient(configstore.GetCodec()) for _, name := range []string{"keep_a", "keep_b", "new_a", "new_b"} { err := mockKv.CAS(context.Background(), name, func(in interface{}) (out interface{}, retry bool, err error) { return testConfig(t, name), true, nil @@ -69,7 +70,7 @@ func TestServer_Ownership(t *testing.T) { // - All configs not in the store but in the existing InstanceManager should be deleted fakeIm := newFakeInstanceManager() - mockKv := consul.NewInMemoryClient(GetCodec()) + mockKv := consul.NewInMemoryClient(configstore.GetCodec()) for _, name := range []string{"owned", "unowned"} { err := mockKv.CAS(context.Background(), name, func(in interface{}) (out interface{}, retry bool, err error) { return testConfig(t, name), true, nil diff --git a/pkg/prom/ha/codec.go b/pkg/prom/instance/configstore/codec.go similarity index 96% rename from pkg/prom/ha/codec.go rename to pkg/prom/instance/configstore/codec.go index 90bd7f7c9c40..cd7606bd9b51 100644 --- a/pkg/prom/ha/codec.go +++ b/pkg/prom/instance/configstore/codec.go @@ -1,4 +1,4 @@ -package ha +package configstore import ( "bytes" @@ -11,6 +11,7 @@ import ( ) // GetCodec returns the codec for encoding and decoding instance.Configs +// in the Remote store. func GetCodec() codec.Codec { return &yamlCodec{} } diff --git a/pkg/prom/ha/codec_test.go b/pkg/prom/instance/configstore/codec_test.go similarity index 97% rename from pkg/prom/ha/codec_test.go rename to pkg/prom/instance/configstore/codec_test.go index 3a99c0b0df84..8e28655bb8eb 100644 --- a/pkg/prom/ha/codec_test.go +++ b/pkg/prom/instance/configstore/codec_test.go @@ -1,4 +1,4 @@ -package ha +package configstore import ( "testing" diff --git a/pkg/prom/instance/configstore/errors.go b/pkg/prom/instance/configstore/errors.go new file mode 100644 index 000000000000..5c622877ca76 --- /dev/null +++ b/pkg/prom/instance/configstore/errors.go @@ -0,0 +1,27 @@ +package configstore + +import "fmt" + +// ErrNotConnected is used when a store operation was called but no connection +// to the store was active. +var ErrNotConnected = fmt.Errorf("not connected to store") + +// NotExistError is used when a config doesn't exist. +type NotExistError struct { + Key string +} + +// Error implements error. +func (e NotExistError) Error() string { + return fmt.Sprintf("configuration %s does not exist", e.Key) +} + +// NotUniqueError is used when two scrape jobs have the same name. +type NotUniqueError struct { + ScrapeJob string +} + +// Error implements error. +func (e NotUniqueError) Error() string { + return fmt.Sprintf("found multiple scrape configs with job name %q", e.ScrapeJob) +} diff --git a/pkg/prom/instance/configstore/remote.go b/pkg/prom/instance/configstore/remote.go new file mode 100644 index 000000000000..da1f559d5962 --- /dev/null +++ b/pkg/prom/instance/configstore/remote.go @@ -0,0 +1,304 @@ +package configstore + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/agent/pkg/prom/instance" + "github.com/prometheus/client_golang/prometheus" +) + +// Remote loads instance files from a remote KV store. The KV store +// can be swapped out in real time. +type Remote struct { + log log.Logger + reg prometheus.Registerer + + kvMut sync.RWMutex + kv kv.Client + reloadKV chan struct{} + + cancelCtx context.Context + cancelFunc context.CancelFunc + + configsMut sync.Mutex + configs map[string]instance.Config + configsCh chan []instance.Config +} + +func NewRemote(l log.Logger, reg prometheus.Registerer, cfg kv.Config) (*Remote, error) { + cancelCtx, cancelFunc := context.WithCancel(context.Background()) + + r := &Remote{ + log: l, + reg: reg, + configsCh: make(chan []instance.Config), + + reloadKV: make(chan struct{}, 1), + + cancelCtx: cancelCtx, + cancelFunc: cancelFunc, + } + if err := r.ApplyConfig(cfg); err != nil { + return nil, fmt.Errorf("failed to apply config for config store: %w", err) + } + + go r.run() + return r, nil +} + +// ApplyConfig applies the config for a kv client. +func (r *Remote) ApplyConfig(cfg kv.Config) error { + r.kvMut.Lock() + defer r.kvMut.Unlock() + + if r.cancelCtx.Err() != nil { + return fmt.Errorf("remote store already stopped") + } + + cli, err := kv.NewClient(cfg, GetCodec(), r.reg) + if err != nil { + return fmt.Errorf("failed to create kv client: %w", err) + } + + r.setClient(cli) + return nil +} + +// setClient sets the active client and notifies run to restart the +// kv watcher. +func (r *Remote) setClient(client kv.Client) { + r.kv = client + r.reloadKV <- struct{}{} +} + +func (r *Remote) run() { + var ( + kvContext context.Context + kvCancel context.CancelFunc + ) + +Outer: + for { + select { + case <-r.cancelCtx.Done(): + break Outer + case <-r.reloadKV: + r.kvMut.RLock() + kv := r.kv + r.kvMut.RUnlock() + + if kvCancel != nil { + kvCancel() + } + kvContext, kvCancel = context.WithCancel(r.cancelCtx) + go r.watchKV(kvContext, kv) + } + } + + if kvCancel != nil { + kvCancel() + } +} + +func (r *Remote) watchKV(ctx context.Context, client kv.Client) { + // Edge case: client was unset, nothing to do here. + if client == nil { + return + } + + client.WatchPrefix(ctx, "", func(key string, v interface{}) bool { + if ctx.Err() != nil { + return false + } + + r.configsMut.Lock() + defer r.configsMut.Unlock() + + switch { + case v == nil: + delete(r.configs, key) + default: + cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string))) + if err != nil { + level.Error(r.log).Log("msg", "could not unmarshal config from store", "name", key, "err", err) + break + } + + r.configs[key] = *cfg + } + + r.configsCh <- r.computeConfigs() + return true + }) +} + +func (r *Remote) computeConfigs() []instance.Config { + cfgs := make([]instance.Config, 0, len(r.configs)) + for _, cfg := range r.configs { + cfgs = append(cfgs, cfg) + } + sort.Slice(cfgs, func(i, j int) bool { + return cfgs[i].Name < cfgs[j].Name + }) + return cfgs +} + +func (r *Remote) List(ctx context.Context) ([]string, error) { + r.kvMut.RLock() + defer r.kvMut.RUnlock() + if r.kv == nil { + return nil, ErrNotConnected + } + + return r.kv.List(ctx, "") +} + +func (r *Remote) Get(ctx context.Context, key string) (instance.Config, error) { + r.kvMut.RLock() + defer r.kvMut.RUnlock() + if r.kv == nil { + return instance.Config{}, ErrNotConnected + } + + v, err := r.kv.Get(ctx, key) + if err != nil { + return instance.Config{}, fmt.Errorf("failed to get config %s: %w", key, err) + } else if v == nil { + return instance.Config{}, NotExistError{Key: key} + } + + cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string))) + if err != nil { + return instance.Config{}, fmt.Errorf("failed to unmarshal config %s: %w", key, err) + } + return *cfg, nil +} + +func (r *Remote) Put(ctx context.Context, c instance.Config) (bool, error) { + // We need to use a write lock here since two Applies can't run concurrently + // (given the current need to perform a store-wide validation.) + r.kvMut.Lock() + defer r.kvMut.Unlock() + if r.kv == nil { + return false, ErrNotConnected + } + + bb, err := instance.MarshalConfig(&c, false) + if err != nil { + return false, fmt.Errorf("failed to marshal config: %w", err) + } + + cfgCh, err := r.all(ctx, nil) + if err != nil { + return false, fmt.Errorf("failed to check validity of config: %w", err) + } + if err := checkUnique(cfgCh, &c); err != nil { + return false, fmt.Errorf("failed to check validity of config: %w", err) + } + + var created bool + err = r.kv.CAS(ctx, c.Name, func(in interface{}) (out interface{}, retry bool, err error) { + // The configuration is new if there's no previous value from the CAS + created = (in == nil) + return string(bb), false, nil + }) + if err != nil { + return false, fmt.Errorf("failed to put config: %w", err) + } + return created, nil +} + +func (r *Remote) Delete(ctx context.Context, key string) error { + r.kvMut.RLock() + defer r.kvMut.RUnlock() + if r.kv == nil { + return ErrNotConnected + } + + // Some KV stores don't return an error if something failed to be + // deleted, so we'll try to get it first. This isn't perfect, and + // it may fail, so we'll silently ignore any errors here unless + // we know for sure the config doesn't exist. + v, err := r.kv.Get(ctx, key) + if err != nil { + level.Warn(r.log).Log("msg", "error validating key existence for deletion", "err", err) + } else if v == nil { + return NotExistError{Key: key} + } + + err = r.kv.Delete(ctx, key) + if err != nil { + return fmt.Errorf("error deleting configuration: %w", err) + } + + return nil +} + +// All retrieves the set of all configs in the store. +func (r *Remote) All(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) { + r.kvMut.RLock() + defer r.kvMut.RUnlock() + return r.all(ctx, keep) +} + +// all can only be called if the kvMut lock is already held. +func (r *Remote) all(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) { + if r.kv == nil { + return nil, ErrNotConnected + } + + keys, err := r.kv.List(ctx, "") + if err != nil { + return nil, fmt.Errorf("failed to list configs: %w", err) + } + + ch := make(chan instance.Config) + for _, key := range keys { + go func(key string) { + if keep != nil && !keep(key) { + level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", key) + return + } + + // TODO(rfratto): retries might be useful here + v, err := r.kv.Get(ctx, key) + if err != nil { + level.Error(r.log).Log("msg", "failed to get config with key", "key", key, "err", err) + return + } else if v == nil { + // Config was deleted since we called list, skip it. + level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", key) + return + } + + cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string))) + if err != nil { + level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", key, "err", err) + return + } + ch <- *cfg + }(key) + } + + return ch, nil +} + +// Watch watches the Store for changes. +func (r *Remote) Watch() <-chan []instance.Config { + return r.configsCh +} + +// Close closes the Remote store. +func (r *Remote) Close() error { + r.kvMut.Lock() + defer r.kvMut.Unlock() + r.cancelFunc() + return nil +} diff --git a/pkg/prom/instance/configstore/store.go b/pkg/prom/instance/configstore/store.go new file mode 100644 index 000000000000..75758706b30e --- /dev/null +++ b/pkg/prom/instance/configstore/store.go @@ -0,0 +1,40 @@ +// Package configstore abstracts the concepts of where instance files get +// retrieved. +package configstore + +import ( + "context" + + "github.com/grafana/agent/pkg/prom/instance" +) + +// Store is some interface to retrieving instance configurations. +type Store interface { + // List gets the list of config names. + List(ctx context.Context) ([]string, error) + + // Get gets an individual config by name. + Get(ctx context.Context, key string) (instance.Config, error) + + // Put applies a new instance Config to the store. + // If the config already exists, created will be false to indicate an + // update. + Put(ctx context.Context, c instance.Config) (created bool, err error) + + // Delete deletes a config from the store. + Delete(ctx context.Context, key string) error + + // All retrieves the entire list of instance configs currently + // in the store. A filtering "keep" function can be provided to ignore some + // configs, which can significantly speed up the operation in some cases. + All(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) + + // Watch watches for new instance Configs. The entire set of known + // instance configs is returned each time. + // + // All callers of Watch receive the same Channel. + Watch() <-chan []instance.Config + + // Close closes the store. + Close() error +} diff --git a/pkg/prom/instance/configstore/unique.go b/pkg/prom/instance/configstore/unique.go new file mode 100644 index 000000000000..44d757de53ac --- /dev/null +++ b/pkg/prom/instance/configstore/unique.go @@ -0,0 +1,35 @@ +package configstore + +import ( + "github.com/grafana/agent/pkg/prom/instance" +) + +// checkUnique validates that cfg is unique from all, ensuring that no two +// configs share a job_name. +func checkUnique(all <-chan instance.Config, cfg *instance.Config) error { + defer func() { + // Drain the channel, which is necessary if we're returning an error. + for range all { + } + }() + + newJobNames := make(map[string]struct{}, len(cfg.ScrapeConfigs)) + for _, sc := range cfg.ScrapeConfigs { + newJobNames[sc.JobName] = struct{}{} + } + + for otherConfig := range all { + // If the other config is the one we're validating, skip it. + if otherConfig.Name == cfg.Name { + continue + } + + for _, otherScrape := range otherConfig.ScrapeConfigs { + if _, exist := newJobNames[otherScrape.JobName]; exist { + return NotUniqueError{ScrapeJob: otherScrape.JobName} + } + } + } + + return nil +} From 824578abb1784f1b9851fcc2961ec00969356e1f Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Wed, 3 Mar 2021 09:45:56 -0500 Subject: [PATCH 2/2] review feedback --- pkg/prom/instance/configstore/remote.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/prom/instance/configstore/remote.go b/pkg/prom/instance/configstore/remote.go index da1f559d5962..19cd933c4e77 100644 --- a/pkg/prom/instance/configstore/remote.go +++ b/pkg/prom/instance/configstore/remote.go @@ -110,6 +110,7 @@ Outer: func (r *Remote) watchKV(ctx context.Context, client kv.Client) { // Edge case: client was unset, nothing to do here. if client == nil { + level.Info(r.log).Log("msg", "not watching the KV, none set") return }