Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max_item_size to Memcached client #2304

Merged
merged 8 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
- [#2304](https://github.com/thanos-io/thanos/pull/2304) Store: Added `max_item_size` config option to memcached-based index cache. This should be set to the max item size configured in memcached (`-I` flag) in order to not waste network round-trips to cache items larger than the limit configured in memcached.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func runStore(
indexCache, err = storecache.NewIndexCache(logger, indexCacheContentYaml, reg)
} else {
indexCache, err = storecache.NewInMemoryIndexCacheWithConfig(logger, reg, storecache.InMemoryIndexCacheConfig{
MaxSize: storecache.Bytes(indexCacheSizeBytes),
MaxSize: model.Bytes(indexCacheSizeBytes),
MaxItemSize: storecache.DefaultInMemoryIndexCacheConfig.MaxItemSize,
})
}
Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ config:
max_idle_connections: 0
max_async_concurrency: 0
max_async_buffer_size: 0
max_item_size: 1MiB
max_get_multi_concurrency: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
Expand All @@ -229,6 +230,7 @@ While the remaining settings are **optional**:
- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed.
- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.

## Index Header
Expand Down
25 changes: 23 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/tracing"
yaml "gopkg.in/yaml.v2"
)

const (
opSet = "set"
opGetMulti = "getmulti"
opSet = "set"
opGetMulti = "getmulti"
reasonMaxItemSize = "max-item-size"
)

var (
Expand All @@ -35,6 +37,7 @@ var (
MaxIdleConnections: 100,
MaxAsyncConcurrency: 20,
MaxAsyncBufferSize: 10000,
MaxItemSize: model.Bytes(1024 * 1024),
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
Expand Down Expand Up @@ -88,6 +91,11 @@ type MemcachedClientConfig struct {
// running GetMulti() operations. If set to 0, concurrency is unlimited.
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`

// MaxItemSize specifies the maximum size of an item stored in memcached. Bigger
// items are skipped to be stored by the client. If set to 0, no maximum size is
// enforced.
MaxItemSize model.Bytes `yaml:"max_item_size"`

// MaxGetMultiBatchSize specifies the maximum number of keys a single underlying
// GetMulti() should run. If more keys are specified, internally keys are splitted
// into multiple batches and fetched concurrently, honoring MaxGetMultiConcurrency
Expand Down Expand Up @@ -140,6 +148,7 @@ type memcachedClient struct {
// Tracked metrics.
operations *prometheus.CounterVec
failures *prometheus.CounterVec
skipped *prometheus.CounterVec
duration *prometheus.HistogramVec
}

Expand Down Expand Up @@ -215,6 +224,12 @@ func newMemcachedClient(
ConstLabels: prometheus.Labels{"name": name},
}, []string{"operation"})

c.skipped = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_memcached_operation_skipped_total",
Help: "Total number of operations against memcached that have been skipped.",
ConstLabels: prometheus.Labels{"name": name},
}, []string{"operation", "reason"})

c.duration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_memcached_operation_duration_seconds",
Help: "Duration of operations against memcached.",
Expand Down Expand Up @@ -250,6 +265,12 @@ func (c *memcachedClient) Stop() {
}

func (c *memcachedClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) (err error) {
// Skip hitting memcached at all if the item is bigger than the max allowed size.
if c.config.MaxItemSize > 0 && uint64(len(value)) > uint64(c.config.MaxItemSize) {
c.skipped.WithLabelValues(opSet, reasonMaxItemSize).Inc()
return nil
}

return c.enqueueAsync(func() {
start := time.Now()
c.operations.WithLabelValues(opSet).Inc()
Expand Down
40 changes: 39 additions & 1 deletion pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -76,6 +77,7 @@ addresses:
testutil.Equals(t, defaultMemcachedClientConfig.DNSProviderUpdateInterval, cache.config.DNSProviderUpdateInterval)
testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiConcurrency, cache.config.MaxGetMultiConcurrency)
testutil.Equals(t, defaultMemcachedClientConfig.MaxGetMultiBatchSize, cache.config.MaxGetMultiBatchSize)
testutil.Equals(t, defaultMemcachedClientConfig.MaxItemSize, cache.config.MaxItemSize)

// Should instance a memcached client with configured YAML config.
conf = []byte(`
Expand All @@ -87,6 +89,7 @@ max_idle_connections: 1
max_async_concurrency: 1
max_async_buffer_size: 1
max_get_multi_concurrency: 1
max_item_size: 1MiB
max_get_multi_batch_size: 1
dns_provider_update_interval: 1s
`)
Expand All @@ -102,6 +105,7 @@ dns_provider_update_interval: 1s
testutil.Equals(t, 1*time.Second, cache.config.DNSProviderUpdateInterval)
testutil.Equals(t, 1, cache.config.MaxGetMultiConcurrency)
testutil.Equals(t, 1, cache.config.MaxGetMultiBatchSize)
testutil.Equals(t, model.Bytes(1024*1024), cache.config.MaxItemSize)
}

func TestMemcachedClient_SetAsync(t *testing.T) {
Expand All @@ -120,9 +124,43 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
testutil.Ok(t, backendMock.waitItems(2))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, []byte("value-2"), actual["key-2"].Value)

testutil.Equals(t, 2.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.skipped.WithLabelValues(opSet, reasonMaxItemSize)))
}

func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

pracucci marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
config := defaultMemcachedClientConfig
config.Addresses = []string{"127.0.0.1:11211"}
config.MaxItemSize = model.Bytes(10)
backendMock := newMemcachedClientBackendMock()

client, err := prepare(config, backendMock)
testutil.Ok(t, err)
defer client.Stop()

testutil.Ok(t, client.SetAsync(ctx, "key-1", []byte("value-1"), time.Second))
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, backendMock.waitItems(1))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, (*memcache.Item)(nil), actual["key-2"])

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opSet)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.operations.WithLabelValues(opGetMulti)))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(client.failures.WithLabelValues(opSet)))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(client.skipped.WithLabelValues(opSet, reasonMaxItemSize)))
}

func TestMemcachedClient_GetMulti(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/units.go → pkg/model/units.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache
package model

import (
"github.com/alecthomas/units"
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/units_test.go → pkg/model/units_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache
package model

import (
"testing"
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/model"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -52,9 +53,9 @@ type InMemoryIndexCache struct {
// InMemoryIndexCacheConfig holds the in-memory index cache config.
type InMemoryIndexCacheConfig struct {
// MaxSize represents overall maximum number of bytes cache can contain.
MaxSize Bytes `yaml:"max_size"`
MaxSize model.Bytes `yaml:"max_size"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for being late to the party, but why we are not using the same thing as Prometheus use, so units.Base2Bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

model.Bytes is based on units.Base2Bytes but adds YAML marshalling support. Am I missing anything?

// MaxItemSize represents maximum size of single item.
MaxItemSize Bytes `yaml:"max_item_size"`
MaxItemSize model.Bytes `yaml:"max_item_size"`
}

// parseInMemoryIndexCacheConfig unmarshals a buffer into a InMemoryIndexCacheConfig with default values.
Expand Down