Skip to content

Commit

Permalink
Added max_item_size to Memcached client (#2304)
Browse files Browse the repository at this point in the history
* Added max_item_size to Memcached client

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Changed imports order and splitted tests

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed type casting

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Changed imports grouping

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Changed memcached max_item_size default from 0 to 1MB

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Increased e2e tests timeout

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Fixed typo in CHANGELOG

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Reverted Makefile changes

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Mar 25, 2020
1 parent c040657 commit c1d7f43
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
- [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runStore(
indexCache, err = storecache.NewIndexCache(logger, indexCacheContentYaml, reg)
} else {
indexCache, err = storecache.NewInMemoryIndexCacheWithConfig(logger, reg, storecache.InMemoryIndexCacheConfig{
MaxSize: storecache.Bytes(indexCacheSizeBytes),
MaxSize: model.Bytes(indexCacheSizeBytes),
MaxItemSize: storecache.DefaultInMemoryIndexCacheConfig.MaxItemSize,
})
}
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ config:
max_idle_connections: 0
max_async_concurrency: 0
max_async_buffer_size: 0
max_item_size: 1MiB
max_get_multi_concurrency: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
Expand All @@ -229,6 +230,7 @@ While the remaining settings are **optional**:
- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed.
- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.

## Index Header
Expand Down
25 changes: 23 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/tracing"
yaml "gopkg.in/yaml.v2"
)

const (
opSet = "set"
opGetMulti = "getmulti"
opSet = "set"
opGetMulti = "getmulti"
reasonMaxItemSize = "max-item-size"
)

var (
Expand All @@ -35,6 +37,7 @@ var (
MaxIdleConnections: 100,
MaxAsyncConcurrency: 20,
MaxAsyncBufferSize: 10000,
MaxItemSize: model.Bytes(1024 * 1024),
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
Expand Down Expand Up @@ -88,6 +91,11 @@ type MemcachedClientConfig struct {
// running GetMulti() operations. If set to 0, concurrency is unlimited.
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`

// MaxItemSize specifies the maximum size of an item stored in memcached. Bigger
// items are skipped to be stored by the client. If set to 0, no maximum size is
// enforced.
MaxItemSize model.Bytes `yaml:"max_item_size"`

// MaxGetMultiBatchSize specifies the maximum number of keys a single underlying
// GetMulti() should run. If more keys are specified, internally keys are splitted
// into multiple batches and fetched concurrently, honoring MaxGetMultiConcurrency
Expand Down Expand Up @@ -140,6 +148,7 @@ type memcachedClient struct {
// Tracked metrics.
operations *prometheus.CounterVec
failures *prometheus.CounterVec
skipped *prometheus.CounterVec
duration *prometheus.HistogramVec
}

Expand Down Expand Up @@ -215,6 +224,12 @@ func newMemcachedClient(
ConstLabels: prometheus.Labels{"name": name},
}, []string{"operation"})

c.skipped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_memcached_operation_skipped_total",
Help: "Total number of operations against memcached that have been skipped.",
ConstLabels: prometheus.Labels{"name": name},
}, []string{"operation", "reason"})

c.duration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_memcached_operation_duration_seconds",
Help: "Duration of operations against memcached.",
Expand Down Expand Up @@ -250,6 +265,12 @@ func (c *memcachedClient) Stop() {
}

func (c *memcachedClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) (err error) {
// Skip hitting memcached at all if the item is bigger than the max allowed size.
if c.config.MaxItemSize > 0 && uint64(len(value)) > uint64(c.config.MaxItemSize) {
c.skipped.WithLabelValues(opSet, reasonMaxItemSize).Inc()
return nil
}

return c.enqueueAsync(func() {
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()
Expand Down
40 changes: 39 additions & 1 deletion pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -76,6 +77,7 @@ addresses:
testutil.Equals(t, defaultMemcachedClientConfig.DNSProviderUpdateInterval, cache.config.DNSProviderUpdateInterval)
testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiConcurrency, cache.config.MaxGetMultiConcurrency)
testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiBatchSize, cache.config.MaxGetMultiBatchSize)
testutil.Equals(t, defaultMemcachedClientConfig.MaxItemSize, cache.config.MaxItemSize)

// Should instance a memcached client with configured YAML config.
conf = []byte(`
Expand All @@ -87,6 +89,7 @@ max_idle_connections: 1
max_async_concurrency: 1
max_async_buffer_size: 1
max_get_multi_concurrency: 1
max_item_size: 1MiB
max_get_multi_batch_size: 1
dns_provider_update_interval: 1s
`)
Expand All @@ -102,6 +105,7 @@ dns_provider_update_interval: 1s
testutil.Equals(t, 1*time.Second, cache.config.DNSProviderUpdateInterval)
testutil.Equals(t, 1, cache.config.MaxGetMultiConcurrency)
testutil.Equals(t, 1, cache.config.MaxGetMultiBatchSize)
testutil.Equals(t, model.Bytes(1024*1024), cache.config.MaxItemSize)
}

func TestMemcachedClient_SetAsync(t *testing.T) {
Expand All @@ -120,9 +124,43 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
testutil.Ok(t, backendMock.waitItems(2))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, []byte("value-2"), actual["key-2"].Value)

testutil.Equals(t, 2.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.skipped.WithLabelValues(opSet, reasonMaxItemSize)))
}

func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx := context.Background()
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.MaxItemSize = model.Bytes(10)
backendMock := newMemcachedClientBackendMock()

client, err := prepare(config, backendMock)
testutil.Ok(t, err)
defer client.Stop()

testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, backendMock.waitItems(1))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, (*memcache.Item)(nil), actual["key-2"])

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.skipped.WithLabelValues(opSet, reasonMaxItemSize)))
}

func TestMemcachedClient_GetMulti(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/units.go → pkg/model/units.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache
package model

import (
"github.com/alecthomas/units"
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/units_test.go → pkg/model/units_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache
package model

import (
"testing"
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/model"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -52,9 +53,9 @@ type InMemoryIndexCache struct {
// InMemoryIndexCacheConfig holds the in-memory index cache config.
type InMemoryIndexCacheConfig struct {
// MaxSize represents overall maximum number of bytes cache can contain.
MaxSize Bytes `yaml:"max_size"`
MaxSize model.Bytes `yaml:"max_size"`
// MaxItemSize represents maximum size of single item.
MaxItemSize Bytes `yaml:"max_item_size"`
MaxItemSize model.Bytes `yaml:"max_item_size"`
}

// parseInMemoryIndexCacheConfig unmarshals a buffer into a InMemoryIndexCacheConfig with default values.
Expand Down

0 comments on commit c1d7f43

Please sign in to comment.