Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow in-memory kv-client to support multiple codec #132

Merged
merged 4 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
* [BUGFIX] Allow in-memory kv-client to support multiple codec #132
9 changes: 6 additions & 3 deletions kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels {
// The NewInMemoryKVClient returned by NewClient() is a singleton, so
// that distributors and ingesters started in the same process can
// find themselves.
var inmemoryStoreInit sync.Once
var inmemoryStore Client
var (
inmemoryStoreInit sync.Once
inmemoryStore *consul.Client
)

// StoreConfig is a configuration used for building single store client, either
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
Expand Down Expand Up @@ -146,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
inmemoryStoreInit.Do(func() {
inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg)
})
client = inmemoryStore
// however we swap the codec so that we can encode different type of values.
client = inmemoryStore.WithCodec(codec)

case "memberlist":
kv, err := cfg.MemberlistKV()
Expand Down
41 changes: 38 additions & 3 deletions kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,7 +97,6 @@ func Test_createClient_multiBackend_mustContainRoleAndTypeLabels(t *testing.T) {
require.Equal(t, "primary", actual["multi"])
require.Equal(t, "primary", actual["inmemory"])
require.Equal(t, "secondary", actual["mock"])

}

func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogramWithRoleLabels string) map[string]string {
Expand Down Expand Up @@ -124,6 +124,7 @@ func typeToRoleMapHistogramLabels(t *testing.T, reg prometheus.Gatherer, histogr
}
return result
}

func newConfigsForTest() (cfg StoreConfig, c codec.Codec) {
cfg = StoreConfig{
Multi: MultiConfig{
Expand Down Expand Up @@ -153,8 +154,7 @@ func (m *mockMessage) ProtoMessage() {
panic("do not use")
}

type testLogger struct {
}
type testLogger struct{}

func (l testLogger) Log(keyvals ...interface{}) error {
return nil
Expand All @@ -170,3 +170,38 @@ func TestDefaultStoreValue(t *testing.T) {
cfg2.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("test", flag.PanicOnError))
assert.Equal(t, "memberlist", cfg2.Store)
}

type stringCodec struct {
value string
}

func (c stringCodec) Decode([]byte) (interface{}, error) {
return c.value, nil
}

func (c stringCodec) Encode(interface{}) ([]byte, error) {
return []byte(c.value), nil
}
func (c stringCodec) CodecID() string { return c.value }

func TestMultipleInMemoryClient(t *testing.T) {
logger := log.NewNopLogger()
foo, err := NewClient(Config{
Store: "inmemory",
}, stringCodec{value: "foo"}, prometheus.NewRegistry(), logger)
require.NoError(t, err)
bar, err := NewClient(Config{
Store: "inmemory",
}, stringCodec{value: "bar"}, prometheus.NewRegistry(), logger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't reuse the same registry see #133

require.NoError(t, err)

require.NoError(t, foo.CAS(context.TODO(), "foo", func(in interface{}) (out interface{}, retry bool, err error) { return "foo", false, nil }))
fooKey, err := foo.Get(ctx, "foo")
require.NoError(t, err)
require.Equal(t, "foo", fooKey.(string))

require.NoError(t, bar.CAS(context.TODO(), "bar", func(in interface{}) (out interface{}, retry bool, err error) { return "bar", false, nil }))
barKey, err := bar.Get(ctx, "bar")
require.NoError(t, err)
require.Equal(t, "bar", barKey.(string))
}
8 changes: 7 additions & 1 deletion kv/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
}

kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx))

// Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value,
// and next call to Get will block as expected. We handle missing value below.
if err != nil {
Expand Down Expand Up @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter {
}
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
}

// WithCodec Clones and changes the codec of the consul client.
func (c *Client) WithCodec(codec codec.Codec) *Client {
new := *c
new.codec = codec
return &new
}