From c13d4c93399e26c092b42dffa6a6554ae152230e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 7 Feb 2022 17:07:25 +0100 Subject: [PATCH 1/4] Allow in-memory kv-client to support multiple codec --- kv/client.go | 9 ++++++--- kv/client_test.go | 42 +++++++++++++++++++++++++++++++++++++++--- kv/consul/client.go | 13 ++++++++++++- 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/kv/client.go b/kv/client.go index 7131975ee..a5ed80550 100644 --- a/kv/client.go +++ b/kv/client.go @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels { // The NewInMemoryKVClient returned by NewClient() is a singleton, so // that distributors and ingesters started in the same process can // find themselves. -var inmemoryStoreInit sync.Once -var inmemoryStore Client +var ( + inmemoryStoreInit sync.Once + inmemoryStore *consul.Client +) // StoreConfig is a configuration used for building single store client, either // Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep @@ -146,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStoreInit.Do(func() { inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) - client = inmemoryStore + // however we swap the codec so that we can encode different type of values. + client = inmemoryStore.Clone().WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/kv/client_test.go b/kv/client_test.go index e6bc21762..bf13f0cd4 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -3,9 +3,11 @@ package kv import ( "context" "flag" + "os" "testing" "time" + "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -96,7 +98,6 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) { require.Equal(t, "primary", actual["multi"]) require.Equal(t, "primary", actual["inmemory"]) require.Equal(t, "secondary", actual["mock"]) - } func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string { @@ -124,6 +125,7 @@ func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogr } return result } + func newConfigsForTest() (cfg StoreConfig, c codec.Codec) { cfg = StoreConfig{ Multi: MultiConfig{ @@ -153,8 +155,7 @@ func (m *mockMessage) ProtoMessage() { panic("do not use") } -type testLogger struct { -} +type testLogger struct{} func (l testLogger) Log(keyvals ...interface{}) error { return nil @@ -170,3 +171,38 @@ func TestDefaultStoreValue(t *testing.T) { cfg2.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("test", flag.PanicOnError)) assert.Equal(t, "memberlist", cfg2.Store) } + +type stringCodec struct { + value string +} + +func (c stringCodec) Decode([]byte) (interface{}, error) { + return c.value, nil +} + +func (c stringCodec) Encode(interface{}) ([]byte, error) { + return []byte(c.value), nil +} +func (c stringCodec) CodecID() string { return c.value } + +func TestMultipleInMemoryClient(t *testing.T) { + logger := log.NewJSONLogger(os.Stdout) + foo, err := NewClient(Config{ + Store: "inmemory", + }, stringCodec{value: "foo"}, prometheus.NewRegistry(), logger) + require.NoError(t, err) + bar, err := NewClient(Config{ + Store: "inmemory", + }, stringCodec{value: "bar"}, prometheus.NewRegistry(), logger) + require.NoError(t, err) + + require.NoError(t, foo.CAS(context.TODO(), "foo", func(in interface{}) (out interface{}, retry bool, err error) { return "foo", false, nil })) + fooKey, err := foo.Get(ctx, "foo") + require.NoError(t, err) + require.Equal(t, "foo", fooKey.(string)) + + require.NoError(t, bar.CAS(context.TODO(), "bar", func(in interface{}) (out interface{}, retry bool, err error) { return "bar", false, nil })) + barKey, err := bar.Get(ctx, "bar") + require.NoError(t, err) + require.Equal(t, "bar", barKey.(string)) +} diff --git a/kv/consul/client.go b/kv/consul/client.go index d278d8c9e..4c2288449 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b } kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) - // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, // and next call to Get will block as expected. We handle missing value below. if err != nil { @@ -397,3 +396,15 @@ func (c *Client) createRateLimiter() *rate.Limiter { } return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } + +// Clone clone the current consul client. +func (c *Client) Clone() *Client { + new := *c + return &new +} + +// WithCodec changes the codec of the consul client. +func (c *Client) WithCodec(codec codec.Codec) *Client { + c.codec = codec + return c +} From 2051097ceb0df9d4f50bf0e9f21a75ce0defc190 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 7 Feb 2022 17:09:45 +0100 Subject: [PATCH 2/4] update changelog. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7701b20d7..d8bab82e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,3 +39,4 @@ * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 +* [BUGFIX] Allow in-memory kv-client to support multiple codec #132 From 03fc64bbbbb79dc89c020d45b1f9100855c2eb94 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 8 Feb 2022 16:17:31 +0100 Subject: [PATCH 3/4] Remove clone and logger in test. --- kv/client.go | 2 +- kv/client_test.go | 3 +-- kv/consul/client.go | 11 +++-------- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/kv/client.go b/kv/client.go index a5ed80550..42bf55954 100644 --- a/kv/client.go +++ b/kv/client.go @@ -149,7 +149,7 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) // however we swap the codec so that we can encode different type of values. - client = inmemoryStore.Clone().WithCodec(codec) + client = inmemoryStore.WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/kv/client_test.go b/kv/client_test.go index bf13f0cd4..606090c72 100644 --- a/kv/client_test.go +++ b/kv/client_test.go @@ -3,7 +3,6 @@ package kv import ( "context" "flag" - "os" "testing" "time" @@ -186,7 +185,7 @@ func (c stringCodec) Encode(interface{}) ([]byte, error) { func (c stringCodec) CodecID() string { return c.value } func TestMultipleInMemoryClient(t *testing.T) { - logger := log.NewJSONLogger(os.Stdout) + logger := log.NewNopLogger() foo, err := NewClient(Config{ Store: "inmemory", }, stringCodec{value: "foo"}, prometheus.NewRegistry(), logger) diff --git a/kv/consul/client.go b/kv/consul/client.go index 4c2288449..90230d08f 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -397,14 +397,9 @@ func (c *Client) createRateLimiter() *rate.Limiter { return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } -// Clone clone the current consul client. -func (c *Client) Clone() *Client { - new := *c - return &new -} - // WithCodec changes the codec of the consul client. func (c *Client) WithCodec(codec codec.Codec) *Client { - c.codec = codec - return c + new := *c + new.codec = codec + return &new } From 56e5f6797217c8aae047eadf23f99f90f2183266 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 8 Feb 2022 16:19:07 +0100 Subject: [PATCH 4/4] Fixup comment according to the change --- kv/consul/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kv/consul/client.go b/kv/consul/client.go index 90230d08f..63114c547 100644 --- a/kv/consul/client.go +++ b/kv/consul/client.go @@ -397,7 +397,7 @@ func (c *Client) createRateLimiter() *rate.Limiter { return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } -// WithCodec changes the codec of the consul client. +// WithCodec Clones and changes the codec of the consul client. func (c *Client) WithCodec(codec codec.Codec) *Client { new := *c new.codec = codec