Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple configuration store from scraping service #445

Merged
merged 2 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/prom/ha/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/agent/pkg/agentproto"
"github.com/grafana/agent/pkg/prom/ha/client"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/grafana/agent/pkg/prom/instance/configstore"
flagutil "github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -115,7 +116,7 @@ func New(reg prometheus.Registerer, cfg Config, globalConfig *config.GlobalConfi

kvClient, err := kv.NewClient(
cfg.KVStore,
GetCodec(),
configstore.GetCodec(),
kv.RegistererWithKVName(reg, "agent_configs"),
)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/prom/ha/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/go-kit/kit/log"
"github.com/grafana/agent/pkg/agentproto"
"github.com/grafana/agent/pkg/prom/instance/configstore"
"github.com/stretchr/testify/require"
)

Expand All @@ -18,7 +19,7 @@ func TestServer_Reshard(t *testing.T) {
// - All configs not in the store but in the existing InstanceManager should be deleted
fakeIm := newFakeInstanceManager()

mockKv := consul.NewInMemoryClient(GetCodec())
mockKv := consul.NewInMemoryClient(configstore.GetCodec())
for _, name := range []string{"keep_a", "keep_b", "new_a", "new_b"} {
err := mockKv.CAS(context.Background(), name, func(in interface{}) (out interface{}, retry bool, err error) {
return testConfig(t, name), true, nil
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestServer_Ownership(t *testing.T) {
// - All configs not in the store but in the existing InstanceManager should be deleted
fakeIm := newFakeInstanceManager()

mockKv := consul.NewInMemoryClient(GetCodec())
mockKv := consul.NewInMemoryClient(configstore.GetCodec())
for _, name := range []string{"owned", "unowned"} {
err := mockKv.CAS(context.Background(), name, func(in interface{}) (out interface{}, retry bool, err error) {
return testConfig(t, name), true, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ha
package configstore

import (
"bytes"
Expand All @@ -11,6 +11,7 @@ import (
)

// GetCodec returns the codec for encoding and decoding instance.Configs
// in the Remote store.
func GetCodec() codec.Codec {
return &yamlCodec{}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ha
package configstore

import (
"testing"
Expand Down
27 changes: 27 additions & 0 deletions pkg/prom/instance/configstore/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package configstore

import "fmt"

// ErrNotConnected is used when a store operation was called but no connection
// to the store was active.
var ErrNotConnected = fmt.Errorf("not connected to store")

// NotExistError is used when a config doesn't exist.
type NotExistError struct {
Key string
}

// Error implements error.
func (e NotExistError) Error() string {
return fmt.Sprintf("configuration %s does not exist", e.Key)
}

// NotUniqueError is used when two scrape jobs have the same name.
type NotUniqueError struct {
ScrapeJob string
}

// Error implements error.
func (e NotUniqueError) Error() string {
return fmt.Sprintf("found multiple scrape configs with job name %q", e.ScrapeJob)
}
305 changes: 305 additions & 0 deletions pkg/prom/instance/configstore/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package configstore

import (
"context"
"fmt"
"sort"
"strings"
"sync"

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/prometheus/client_golang/prometheus"
)

// Remote loads instance files from a remote KV store. The KV store
// can be swapped out in real time.
type Remote struct {
log log.Logger
reg prometheus.Registerer

kvMut sync.RWMutex
kv kv.Client
reloadKV chan struct{}

cancelCtx context.Context
cancelFunc context.CancelFunc

configsMut sync.Mutex
configs map[string]instance.Config
configsCh chan []instance.Config
}

func NewRemote(l log.Logger, reg prometheus.Registerer, cfg kv.Config) (*Remote, error) {
cancelCtx, cancelFunc := context.WithCancel(context.Background())

r := &Remote{
log: l,
reg: reg,
configsCh: make(chan []instance.Config),

reloadKV: make(chan struct{}, 1),

cancelCtx: cancelCtx,
cancelFunc: cancelFunc,
}
if err := r.ApplyConfig(cfg); err != nil {
return nil, fmt.Errorf("failed to apply config for config store: %w", err)
}

go r.run()
return r, nil
}

// ApplyConfig applies the config for a kv client.
func (r *Remote) ApplyConfig(cfg kv.Config) error {
r.kvMut.Lock()
defer r.kvMut.Unlock()

if r.cancelCtx.Err() != nil {
return fmt.Errorf("remote store already stopped")
}

cli, err := kv.NewClient(cfg, GetCodec(), r.reg)
if err != nil {
return fmt.Errorf("failed to create kv client: %w", err)
}

r.setClient(cli)
return nil
}

// setClient sets the active client and notifies run to restart the
// kv watcher.
func (r *Remote) setClient(client kv.Client) {
r.kv = client
r.reloadKV <- struct{}{}
}

func (r *Remote) run() {
var (
kvContext context.Context
kvCancel context.CancelFunc
)

Outer:
for {
select {
case <-r.cancelCtx.Done():
break Outer
case <-r.reloadKV:
r.kvMut.RLock()
kv := r.kv
r.kvMut.RUnlock()

if kvCancel != nil {
kvCancel()
}
kvContext, kvCancel = context.WithCancel(r.cancelCtx)
go r.watchKV(kvContext, kv)
}
}

if kvCancel != nil {
kvCancel()
}
}

func (r *Remote) watchKV(ctx context.Context, client kv.Client) {
// Edge case: client was unset, nothing to do here.
if client == nil {
level.Info(r.log).Log("msg", "not watching the KV, none set")
return
rfratto marked this conversation as resolved.
Show resolved Hide resolved
}

client.WatchPrefix(ctx, "", func(key string, v interface{}) bool {
if ctx.Err() != nil {
return false
}

r.configsMut.Lock()
defer r.configsMut.Unlock()

switch {
case v == nil:
delete(r.configs, key)
default:
cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
if err != nil {
level.Error(r.log).Log("msg", "could not unmarshal config from store", "name", key, "err", err)
break
}

r.configs[key] = *cfg
}

r.configsCh <- r.computeConfigs()
return true
})
}

func (r *Remote) computeConfigs() []instance.Config {
cfgs := make([]instance.Config, 0, len(r.configs))
for _, cfg := range r.configs {
cfgs = append(cfgs, cfg)
}
sort.Slice(cfgs, func(i, j int) bool {
return cfgs[i].Name < cfgs[j].Name
})
return cfgs
}

func (r *Remote) List(ctx context.Context) ([]string, error) {
r.kvMut.RLock()
defer r.kvMut.RUnlock()
if r.kv == nil {
return nil, ErrNotConnected
}

return r.kv.List(ctx, "")
}

func (r *Remote) Get(ctx context.Context, key string) (instance.Config, error) {
r.kvMut.RLock()
defer r.kvMut.RUnlock()
if r.kv == nil {
return instance.Config{}, ErrNotConnected
}

v, err := r.kv.Get(ctx, key)
if err != nil {
return instance.Config{}, fmt.Errorf("failed to get config %s: %w", key, err)
} else if v == nil {
return instance.Config{}, NotExistError{Key: key}
}

cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
if err != nil {
return instance.Config{}, fmt.Errorf("failed to unmarshal config %s: %w", key, err)
}
return *cfg, nil
}

func (r *Remote) Put(ctx context.Context, c instance.Config) (bool, error) {
// We need to use a write lock here since two Applies can't run concurrently
// (given the current need to perform a store-wide validation.)
r.kvMut.Lock()
defer r.kvMut.Unlock()
if r.kv == nil {
return false, ErrNotConnected
}

bb, err := instance.MarshalConfig(&c, false)
if err != nil {
return false, fmt.Errorf("failed to marshal config: %w", err)
}

cfgCh, err := r.all(ctx, nil)
if err != nil {
return false, fmt.Errorf("failed to check validity of config: %w", err)
}
if err := checkUnique(cfgCh, &c); err != nil {
return false, fmt.Errorf("failed to check validity of config: %w", err)
}

var created bool
err = r.kv.CAS(ctx, c.Name, func(in interface{}) (out interface{}, retry bool, err error) {
// The configuration is new if there's no previous value from the CAS
created = (in == nil)
return string(bb), false, nil
})
if err != nil {
return false, fmt.Errorf("failed to put config: %w", err)
}
return created, nil
}

func (r *Remote) Delete(ctx context.Context, key string) error {
r.kvMut.RLock()
defer r.kvMut.RUnlock()
if r.kv == nil {
return ErrNotConnected
}

// Some KV stores don't return an error if something failed to be
// deleted, so we'll try to get it first. This isn't perfect, and
// it may fail, so we'll silently ignore any errors here unless
// we know for sure the config doesn't exist.
v, err := r.kv.Get(ctx, key)
if err != nil {
level.Warn(r.log).Log("msg", "error validating key existence for deletion", "err", err)
} else if v == nil {
return NotExistError{Key: key}
}

err = r.kv.Delete(ctx, key)
if err != nil {
return fmt.Errorf("error deleting configuration: %w", err)
}

return nil
}

// All retrieves the set of all configs in the store.
func (r *Remote) All(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
r.kvMut.RLock()
defer r.kvMut.RUnlock()
return r.all(ctx, keep)
}

// all can only be called if the kvMut lock is already held.
func (r *Remote) all(ctx context.Context, keep func(key string) bool) (<-chan instance.Config, error) {
if r.kv == nil {
return nil, ErrNotConnected
}

keys, err := r.kv.List(ctx, "")
if err != nil {
return nil, fmt.Errorf("failed to list configs: %w", err)
}

ch := make(chan instance.Config)
for _, key := range keys {
go func(key string) {
if keep != nil && !keep(key) {
level.Debug(r.log).Log("msg", "skipping key that was filtered out", "key", key)
return
}

// TODO(rfratto): retries might be useful here
v, err := r.kv.Get(ctx, key)
if err != nil {
level.Error(r.log).Log("msg", "failed to get config with key", "key", key, "err", err)
return
} else if v == nil {
// Config was deleted since we called list, skip it.
level.Debug(r.log).Log("msg", "skipping key that was deleted after list was called", "key", key)
return
}

cfg, err := instance.UnmarshalConfig(strings.NewReader(v.(string)))
if err != nil {
level.Error(r.log).Log("msg", "failed to unmarshal config from store", "key", key, "err", err)
return
}
ch <- *cfg
}(key)
}

return ch, nil
}

// Watch watches the Store for changes.
func (r *Remote) Watch() <-chan []instance.Config {
return r.configsCh
}

// Close closes the Remote store.
func (r *Remote) Close() error {
r.kvMut.Lock()
defer r.kvMut.Unlock()
r.cancelFunc()
return nil
}
Loading