Skip to content

Commit

Permalink
Switched MemcachedClient GetMulti() concurrency limit to a gate
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Dec 18, 2019
1 parent 0142675 commit ee88dd5
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 91 deletions.
6 changes: 3 additions & 3 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ config:
max_idle_connections: 0
max_async_concurrency: 0
max_async_buffer_size: 0
max_get_multi_batch_concurrency: 0
max_get_multi_concurrency: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
```
Expand All @@ -218,6 +218,6 @@ While the remaining settings are **optional**:
- `max_idle_connections`: maximum number of idle connections that will be maintained per address.
- `max_async_concurrency`: maximum number of concurrent asynchronous operations can occur.
- `max_async_buffer_size`: maximum number of enqueued asynchronous operations allowed.
- `max_get_multi_batch_concurrency`: maximum number of concurrent batch executions when fetching keys.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently up to `max_get_multi_batch_concurrency` parallelism. If set to `0`, the max batch size is unlimited.
- `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited.
- `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
2 changes: 1 addition & 1 deletion pkg/cacheutil/jump_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cacheutil
// jumpHash consistently chooses a hash bucket number in the range
// [0, numBuckets) for the given key. numBuckets must be >= 1.
//
// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license)
// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license).
func jumpHash(key uint64, numBuckets int) int32 {
var b int64 = -1
var j int64
Expand Down
125 changes: 59 additions & 66 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package cacheutil

import (
"context"
"errors"
"sync"
"time"

"github.com/bradfitz/gomemcache/memcache"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/tracing"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
)

const (
Expand All @@ -26,13 +27,13 @@ var (
errMemcachedConfigNoAddrs = errors.New("no memcached addrs provided")

defaultMemcachedClientConfig = MemcachedClientConfig{
Timeout: 500 * time.Millisecond,
MaxIdleConnections: 100,
MaxAsyncConcurrency: 20,
MaxAsyncBufferSize: 10000,
MaxGetMultiBatchConcurrency: 20,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
Timeout: 500 * time.Millisecond,
MaxIdleConnections: 100,
MaxAsyncConcurrency: 20,
MaxAsyncBufferSize: 10000,
MaxGetMultiConcurrency: 100,
MaxGetMultiBatchSize: 0,
DNSProviderUpdateInterval: 10 * time.Second,
}
)

Expand Down Expand Up @@ -77,13 +78,13 @@ type MemcachedClientConfig struct {
// operations allowed.
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`

// MaxGetMultiBatchConcurrency specifies the maximum number of concurrent batch
// executions by GetMulti().
MaxGetMultiBatchConcurrency int `yaml:"max_get_multi_batch_concurrency"`
// MaxGetMultiConcurrency specifies the maximum number of concurrent connections
// running GetMulti() operations. If set to 0, concurrency is unlimited.
MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"`

// MaxGetMultiBatchSize specifies the maximum number of keys a single underlying
// GetMulti() should run. If more keys are specified, internally keys are splitted
// into multiple batches and fetched concurrently up to MaxGetMultiBatchConcurrency
// into multiple batches and fetched concurrently, honoring MaxGetMultiConcurrency
// parallelism. If set to 0, the max batch size is unlimited.
MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size"`

Expand Down Expand Up @@ -124,8 +125,8 @@ type memcachedClient struct {
// Channel used to enqueue async operations.
asyncQueue chan func()

// Channel used to enqueue get multi operations.
getMultiQueue chan *memcachedGetMultiBatch
// Gate used to enforce the max number of concurrent GetMulti() operations.
getMultiGate *gate.Gate

// Wait group used to wait all workers on stopping.
workers sync.WaitGroup
Expand All @@ -136,12 +137,6 @@ type memcachedClient struct {
duration *prometheus.HistogramVec
}

type memcachedGetMultiBatch struct {
ctx context.Context
keys []string
results chan<- *memcachedGetMultiResult
}

type memcachedGetMultiResult struct {
items map[string]*memcache.Item
err error
Expand Down Expand Up @@ -189,14 +184,17 @@ func newMemcachedClient(
)

c := &memcachedClient{
logger: logger,
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
getMultiQueue: make(chan *memcachedGetMultiBatch),
stop: make(chan struct{}, 1),
logger: logger,
config: config,
client: client,
selector: selector,
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.NewGate(
config.MaxGetMultiConcurrency,
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
),
}

c.operations = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -239,13 +237,6 @@ func newMemcachedClient(
go c.asyncQueueProcessLoop()
}

// Start a number of goroutines - processing get multi batch operations - equal
// to the max concurrency we have.
c.workers.Add(c.config.MaxGetMultiBatchConcurrency)
for i := 0; i < c.config.MaxGetMultiBatchConcurrency; i++ {
go c.getMultiQueueProcessLoop()
}

return c, nil
}

Expand Down Expand Up @@ -324,20 +315,26 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
results := make(chan *memcachedGetMultiResult, numResults)
defer close(results)

go func() {
for batchStart := 0; batchStart < len(keys); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(keys) {
batchEnd = len(keys)
}

c.getMultiQueue <- &memcachedGetMultiBatch{
ctx: ctx,
keys: keys[batchStart:batchEnd],
results: results,
}
// Spawn a goroutine for each batch request. The max concurrency will be
// enforced by getMultiSingle().
for batchStart := 0; batchStart < len(keys); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(keys) {
batchEnd = len(keys)
}
}()

batchKeys := keys[batchStart:batchEnd]

c.workers.Add(1)
go func() {
defer c.workers.Done()

res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(ctx, batchKeys)

results <- res
}()
}

// Wait for all batch results. In case of error, we keep
// track of the last error occurred.
Expand All @@ -358,10 +355,22 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
}

func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (map[string]*memcache.Item, error) {
// Wait until we get a free slot from the gate, if the max
// concurrency should be enforced.
if c.config.MaxGetMultiConcurrency > 0 {
span, _ := tracing.StartSpan(ctx, "memcached_getmulti_gate_ismyturn")
err := c.getMultiGate.IsMyTurn(ctx)
span.Finish()
if err != nil {
return nil, errors.Wrapf(err, "failed to wait for turn")
}
defer c.getMultiGate.Done()
}

start := time.Now()
c.operations.WithLabelValues(opGetMulti).Inc()

span, _ := tracing.StartSpan(ctx, "memcached_get_multi")
span, _ := tracing.StartSpan(ctx, "memcached_getmulti")
items, err := c.client.GetMulti(keys)
span.Finish()
if err != nil {
Expand Down Expand Up @@ -395,22 +404,6 @@ func (c *memcachedClient) asyncQueueProcessLoop() {
}
}

func (c *memcachedClient) getMultiQueueProcessLoop() {
defer c.workers.Done()

for {
select {
case batch := <-c.getMultiQueue:
res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(batch.ctx, batch.keys)

batch.results <- res
case <-c.stop:
return
}
}
}

func (c *memcachedClient) resolveAddrsLoop() {
defer c.workers.Done()

Expand Down
Loading

0 comments on commit ee88dd5

Please sign in to comment.