Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes KV store implementation #94

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ac3712a
Implement first interface methods for the kubernetes kv
simonswine Dec 3, 2021
68962f1
Add k8s kv CAS implementation
dimitarvdimitrov Dec 3, 2021
dc77595
Strengthen assertions in kubernetes_integration_test.go
dimitarvdimitrov Dec 3, 2021
fabeae0
Add some more assertions in kubernetes_integration_test.go
dimitarvdimitrov Dec 3, 2021
5c38440
Implement k8s kv Delete
dimitarvdimitrov Dec 3, 2021
279fde7
Merge pull request #1 from dimitarvdimitrov/20211203_r2g
simonswine Dec 3, 2021
fe2ae24
Implement in memory client
simonswine Dec 3, 2021
24d6802
Ensure restclient is populated
simonswine Dec 4, 2021
1c8311e
Merge pull request #92 from simonswine/20211203_r2g
dimitarvdimitrov Dec 6, 2021
edd830f
Register config flags
dimitarvdimitrov Dec 6, 2021
8d3b45e
Implement k8s CAS watch
dimitarvdimitrov Dec 6, 2021
5aad478
Remove usage of go-kit/kit
dimitarvdimitrov Dec 6, 2021
86fd0f9
Fix data race in watcher_test.go
dimitarvdimitrov Dec 6, 2021
2e255e9
Add a TODO
dimitarvdimitrov Dec 6, 2021
2eed822
Fix encoding during watch
dimitarvdimitrov Dec 8, 2021
900b0a8
Add retries and metrics to k8s KV
dimitarvdimitrov Dec 8, 2021
73cdf22
Add tests for k8s CAS
dimitarvdimitrov Dec 9, 2021
230c1df
Remove deadcode from kubernetes_integration_test.go
dimitarvdimitrov Dec 9, 2021
ef54cbf
Add docs to kv/kubernetes
dimitarvdimitrov Dec 10, 2021
a08e42a
Update CHANGELOG.md
dimitarvdimitrov Dec 10, 2021
ab789cf
Update config flags
dimitarvdimitrov Dec 10, 2021
337a5ff
Refactor memberlist to use shared watcher implementation
dimitarvdimitrov Dec 10, 2021
0e9f3be
Further simplify signatures in memberlist
dimitarvdimitrov Dec 10, 2021
bf386c8
Restore comment in go.mod
dimitarvdimitrov Dec 10, 2021
1a392e7
Propagate stops to watcher in memberlist
dimitarvdimitrov Dec 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Implement kubernetes KV store client. #94
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ require (
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.7
k8s.io/apimachinery v0.21.7
k8s.io/client-go v0.21.7
)

replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.4
replace k8s.io/client-go v12.0.0+incompatible => k8s.io/client-go v0.21.7

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet.
Expand Down
268 changes: 263 additions & 5 deletions go.sum

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/kv/etcd"
"github.com/grafana/dskit/kv/kubernetes"
"github.com/grafana/dskit/kv/memberlist"
)

Expand Down Expand Up @@ -40,9 +41,10 @@ var inmemoryStore Client
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
// single-client config separate from final client-config (with all the wrappers)
type StoreConfig struct {
Consul consul.Config `yaml:"consul"`
Etcd etcd.Config `yaml:"etcd"`
Multi MultiConfig `yaml:"multi"`
Consul consul.Config `yaml:"consul"`
Etcd etcd.Config `yaml:"etcd"`
Multi MultiConfig `yaml:"multi"`
Kubernetes kubernetes.Config `yaml:"kubernetes"`

// Function that returns memberlist.KV store to use. By using a function, we can delay
// initialization of memberlist.KV until it is actually required.
Expand Down Expand Up @@ -76,8 +78,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
if flagsPrefix == "" {
flagsPrefix = "ring."
}

cfg.Kubernetes.RegisterFlags(f, flagsPrefix)

f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.")
f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.")
f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, kubernetes, multi.")
}

// Client is a high-level client for key-value stores (such as Etcd and
Expand Down Expand Up @@ -142,6 +147,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
})
client = inmemoryStore

case "kubernetes":
client, err = kubernetes.NewClient(cfg.Kubernetes, codec, logger, reg)

case "memberlist":
kv, err := cfg.MemberlistKV()
if err != nil {
Expand Down
196 changes: 196 additions & 0 deletions kv/internal/watch/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package watch

import (
"context"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Watcher struct {
logger log.Logger

// closed on shutdown
shutdown chan struct{}

// Key watchers
watchersMu sync.Mutex
watchers map[string][]chan update
prefixWatchers map[string][]chan update

watchPrefixDroppedNotifications *prometheus.CounterVec
}

type update struct {
key string
value interface{}
}

func NewWatcher(l log.Logger, r prometheus.Registerer) *Watcher {
return &Watcher{
logger: l,
shutdown: make(chan struct{}),
watchers: make(map[string][]chan update),
prefixWatchers: make(map[string][]chan update),

watchPrefixDroppedNotifications: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "watch_prefix_dropped_notifications",
Help: "Number of dropped notifications in WatchPrefix function",
}, []string{"prefix"}),
}
}

// WatchKey watches for value changes for given key. When value changes, 'f' function is called with the
// latest value. Notifications that arrive while 'f' is running are coalesced into one subsequent 'f' call.
//
// Watching ends when 'f' returns false, context is done, or this client is shut down.
func (w *Watcher) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
// keep one extra notification, to avoid missing notification if we're busy running the function
watchChan := make(chan update, 1)

// register watcher
w.watchersMu.Lock()
w.watchers[key] = append(w.watchers[key], watchChan)
w.watchersMu.Unlock()

defer func() {
// unregister watcher on exit
w.watchersMu.Lock()
defer w.watchersMu.Unlock()

removeWatcherChannel(key, watchChan, w.watchers)
}()

for {
select {
case newValue := <-watchChan:
// value changed
if !f(newValue.value) {
return
}

case <-w.shutdown:
// stop watching on shutdown
return

case <-ctx.Done():
return
}
}
}

// WatchPrefix watches for any change of values stored under keys with given prefix. When change occurs,
// function 'f' is called with key and current value.
// Each change of the key results in one notification. If there are too many pending notifications ('f' is slow),
// some notifications may be lost.
//
// Watching ends when 'f' returns false, context is done, or this client is shut down.
func (w *Watcher) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
// we use bigger buffer here, since keys are interesting and we don't want to lose them.
watchChan := make(chan update, 16)

// register watcher
w.watchersMu.Lock()
w.prefixWatchers[prefix] = append(w.prefixWatchers[prefix], watchChan)
w.watchersMu.Unlock()

defer func() {
// unregister watcher on exit
w.watchersMu.Lock()
defer w.watchersMu.Unlock()

removeWatcherChannel(prefix, watchChan, w.prefixWatchers)
}()

for {
select {
case newValue := <-watchChan:
if !f(newValue.key, newValue.value) {
return
}

case <-w.shutdown:
// stop watching on shutdown
return

case <-ctx.Done():
return
}
}
}

func removeWatcherChannel(k string, w chan update, watchers map[string][]chan update) {
ws := watchers[k]
for ix, kw := range ws {
if kw == w {
ws = append(ws[:ix], ws[ix+1:]...)
break
}
}

if len(ws) > 0 {
watchers[k] = ws
} else {
delete(watchers, k)
}
}

func (w *Watcher) Notify(key string, value interface{}) {
w.watchersMu.Lock()
defer w.watchersMu.Unlock()

watchKey := update{
key: key,
value: value,
}

for _, kw := range w.watchers[key] {
select {
case kw <- watchKey:
// notification sent.
default:
// cannot send notification to this watcher at the moment
// but since this is a buffered channel, it means that
// there is already a pending notification anyway
}
}

for p, ws := range w.prefixWatchers {
if strings.HasPrefix(key, p) {
for _, pw := range ws {
select {
case pw <- watchKey:
// notification sent.
default:
c, _ := w.watchPrefixDroppedNotifications.GetMetricWithLabelValues(p)
if c != nil {
c.Inc()
}

level.Warn(w.logger).Log("msg", "failed to send notification to prefix watcher", "prefix", p)
}
}
}
}
}

func (w *Watcher) prefixWatchersCount() int {
w.watchersMu.Lock()
defer w.watchersMu.Unlock()
return len(w.prefixWatchers)
}

func (w *Watcher) keyWatchersCount() int {
w.watchersMu.Lock()
defer w.watchersMu.Unlock()
return len(w.watchers)
}

func (w *Watcher) Stop(_ error) error {
close(w.shutdown)
return nil
}
83 changes: 83 additions & 0 deletions kv/internal/watch/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package watch

import (
"context"
"math/rand"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
)

func TestWatcher(t *testing.T) {
t.Run("Notify notifies watchers that are watching a key", func(t *testing.T) {
w := newTestWatcher()
const key = "/key"
notifyVal := rand.Int()
notified := make(chan struct{})

go w.WatchKey(context.Background(), key, func(val interface{}) bool {
assert.Equal(t, notifyVal, val)
close(notified)
return false
})

// Wait until the watcher has been registered
require.NoError(t, wait.Poll(time.Millisecond*10, time.Second, func() (bool, error) {
return w.keyWatchersCount() > 0, nil
}))

w.Notify(key, notifyVal)

select {
case <-time.NewTimer(time.Second).C:
assert.FailNow(t, "notifier didn't get called within a second")
case <-notified:
}
})

t.Run("Notify notifies watchers that are watching a prefix", func(t *testing.T) {
w := newTestWatcher()

const prefix = "/prefix"
const key = prefix + "/key"
notifyVal := rand.Int()
notified := make(chan struct{})

go w.WatchPrefix(context.Background(), prefix, func(k string, val interface{}) bool {
assert.Equal(t, notifyVal, val)
assert.Equal(t, key, k)
close(notified)
return false
})

// Wait until the watcher has been registered
require.NoError(t, wait.Poll(time.Millisecond*10, time.Second, func() (bool, error) {
return w.prefixWatchersCount() > 0, nil
}))

w.Notify(key, notifyVal)

select {
case <-time.NewTimer(time.Second).C:
assert.FailNow(t, "notifier didn't get called within a second")
case <-notified:
}
})
}

func newTestWatcher() *Watcher {
var logger log.Logger
if testing.Verbose() {
logger = log.NewLogfmtLogger(os.Stdout)
} else {
logger = log.NewNopLogger()
}

return NewWatcher(logger, nil)
}
Loading