Skip to content

Commit

Permalink
Implement BACKEND_TYPE=memcache as an alternative k/v store to redis
Browse files Browse the repository at this point in the history
MEMCACHE_HOST_PORT=host:port must be set with BACKEND_TYPE=memcache

To minimize roundtrips when getting multiple keys, the memcache implementation
does a GetMulti to fetch the existing rate limit usage and does increments
asynchronously in background goroutines, since the memcache API doesn't offer
multi-increment.

Resolves envoyproxy#140

Signed-off-by: David Weitzman <dweitzman@pinterest.com>
  • Loading branch information
dweitzman committed Sep 8, 2020
1 parent 8ede3bf commit 0478e41
Show file tree
Hide file tree
Showing 13 changed files with 983 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.integration
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Running this docker image runs the integration tests.
FROM golang:1.14

RUN apt-get update -y && apt-get install sudo stunnel4 redis -y && rm -rf /var/lib/apt/lists/*
RUN apt-get update -y && apt-get install sudo stunnel4 redis memcached -y && rm -rf /var/lib/apt/lists/*

WORKDIR /workdir

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ tests_with_redis: bootstrap_redis_tls tests_unit
redis-server --port 6382 --requirepass password123 &
redis-server --port 6384 --requirepass password123 &
redis-server --port 6385 --requirepass password123 &
memcached -u root --port 6386 -m 64 &
go test -race -tags=integration $(MODULE)/...

.PHONY: docker_tests
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ services:
networks:
- ratelimit-network

memcached:
image: memcached:alpine
expose:
- 11211
ports:
- 11211:11211
networks:
- ratelimit-network

# minimal container that builds the ratelimit service binary and exits.
ratelimit-build:
image: golang:1.14-alpine
Expand Down Expand Up @@ -50,6 +59,7 @@ services:
- REDIS_URL=redis:6379
- RUNTIME_ROOT=/data
- RUNTIME_SUBDIRECTORY=ratelimit
- MEMCACHE_HOST_PORT=memcached:11211

networks:
ratelimit-network:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/alicebob/miniredis/v2 v2.11.4
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 // indirect
github.com/coocood/freecache v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMw
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.11.4 h1:GsuyeunTx7EllZBU3/6Ji3dhMQZDpC9rLf1luJ+6M5M=
github.com/alicebob/miniredis/v2 v2.11.4/go.mod h1:VL3UDEfAH59bSa7MuHMuFToxkqyHh69s/WUbYlOAuyg=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down
300 changes: 300 additions & 0 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// The memcached limiter uses GetMulti() to check keys in parallel and then does
// increments asynchronously in the backend, since the memcache interface doesn't
// support multi-increment and it seems worthwhile to minimize the number of
// concurrent or sequential RPCs in the critical path.
//
// Another difference from redis is that memcache doesn't create a key implicitly by
// incrementing a missing entry. Instead, when increment fails an explicit "add" needs
// to be called. The process of increment becomes a bit of a dance since we try to
// limit the number of RPCs. First we call increment, then add if the increment
// failed, then increment again if the add failed (which could happen if there was
// a race to call "add").

package memcached

import (
"context"
"math"
"math/rand"
"strconv"
"sync"

"github.com/coocood/freecache"
stats "github.com/lyft/gostats"

"github.com/bradfitz/gomemcache/memcache"

logger "github.com/sirupsen/logrus"

pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3"

"github.com/envoyproxy/ratelimit/src/assert"
"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/settings"
)

type rateLimitCache interface {
// Same as in limiter.RateLimitCache
DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus

// Waits for any lingering goroutines that are incrementing memcache values.
// This is used for unit tests, since the memcache increments happen
// asynchronously in the background.
Flush()
}

type rateLimitMemcacheImpl struct {
client Client
timeSource limiter.TimeSource
jitterRand *rand.Rand
expirationJitterMaxSeconds int64
cacheKeyGenerator limiter.CacheKeyGenerator
localCache *freecache.Cache
wg sync.WaitGroup
}

var _ limiter.RateLimitCache = (*rateLimitMemcacheImpl)(nil)

func max(a uint32, b uint32) uint32 {
if a > b {
return a
}
return b
}

func (this *rateLimitMemcacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus {

logger.Debugf("starting cache lookup")

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := max(1, request.HitsAddend)

// First build a list of all cache keys that we are actually going to hit. generateCacheKey()
// returns an empty string in the key if there is no limit so that we can keep the arrays
// all the same size.
assert.Assert(len(request.Descriptors) == len(limits))
cacheKeys := make([]limiter.CacheKey, len(request.Descriptors))
now := this.timeSource.UnixNow()
for i := 0; i < len(request.Descriptors); i++ {
cacheKeys[i] = this.cacheKeyGenerator.GenerateCacheKey(request.Domain, request.Descriptors[i], limits[i], now)

// Increase statistics for limits hit by their respective requests.
if limits[i] != nil {
limits[i].Stats.TotalHits.Add(uint64(hitsAddend))
}
}

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))

keysToGet := make([]string, 0, len(request.Descriptors))

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}

if this.localCache != nil {
// Get returns the value or not found error.
_, err := this.localCache.Get([]byte(cacheKey.Key))
if err == nil {
isOverLimitWithLocalCache[i] = true
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
continue
}
}

logger.Debugf("looking up cache key: %s", cacheKey.Key)
keysToGet = append(keysToGet, cacheKey.Key)
}

// Now fetch the pipeline.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
len(request.Descriptors))

var memcacheValues map[string]*memcache.Item
var err error

if len(keysToGet) > 0 {
memcacheValues, err = this.client.GetMulti(keysToGet)
if err != nil {
logger.Errorf("Error multi-getting memcache keys (%s): %s", keysToGet, err)
}
}

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
CurrentLimit: nil,
LimitRemaining: 0,
}
continue
}

if isOverLimitWithLocalCache[i] {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OVER_LIMIT,
CurrentLimit: limits[i].Limit,
LimitRemaining: 0,
}
limits[i].Stats.OverLimit.Add(uint64(hitsAddend))
limits[i].Stats.OverLimitWithLocalCache.Add(uint64(hitsAddend))
continue
}

rawMemcacheValue, ok := memcacheValues[cacheKey.Key]
var limitBeforeIncrease uint32
if ok {
decoded, err := strconv.ParseInt(string(rawMemcacheValue.Value), 10, 32)
if err != nil {
logger.Errorf("Unexpected non-numeric value in memcached: %v", rawMemcacheValue)
} else {
limitBeforeIncrease = uint32(decoded)
}

}

limitAfterIncrease := limitBeforeIncrease + hitsAddend
overLimitThreshold := limits[i].Limit.RequestsPerUnit
// The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio.
// We need to know it in both the OK and OVER_LIMIT scenarios.
nearLimitThreshold := uint32(math.Floor(float64(float32(overLimitThreshold) * config.NearLimitRatio)))

logger.Debugf("cache key: %s current: %d", cacheKey.Key, limitAfterIncrease)
if limitAfterIncrease > overLimitThreshold {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OVER_LIMIT,
CurrentLimit: limits[i].Limit,
LimitRemaining: 0,
}

// Increase over limit statistics. Because we support += behavior for increasing the limit, we need to
// assess if the entire hitsAddend were over the limit. That is, if the limit's value before adding the
// N hits was over the limit, then all the N hits were over limit.
// Otherwise, only the difference between the current limit value and the over limit threshold
// were over limit hits.
if limitBeforeIncrease >= overLimitThreshold {
limits[i].Stats.OverLimit.Add(uint64(hitsAddend))
} else {
limits[i].Stats.OverLimit.Add(uint64(limitAfterIncrease - overLimitThreshold))

// If the limit before increase was below the over limit value, then some of the hits were
// in the near limit range.
limits[i].Stats.NearLimit.Add(uint64(overLimitThreshold - max(nearLimitThreshold, limitBeforeIncrease)))
}
if this.localCache != nil {
// Set the TTL of the local_cache to be the entire duration.
// Since the cache_key gets changed once the time crosses over current time slot, the over-the-limit
// cache keys in local_cache lose effectiveness.
// For example, if we have an hour limit on all mongo connections, the cache key would be
// similar to mongo_1h, mongo_2h, etc. In the hour 1 (0h0m - 0h59m), the cache key is mongo_1h, we start
// to get ratelimited in the 50th minute, the ttl of local_cache will be set as 1 hour(0h50m-1h49m).
// In the time of 1h1m, since the cache key becomes different (mongo_2h), it won't get ratelimited.
err := this.localCache.Set([]byte(cacheKey.Key), []byte{}, int(limiter.UnitToDivider(limits[i].Limit.Unit)))
if err != nil {
logger.Errorf("Failing to set local cache key: %s", cacheKey.Key)
}
}
} else {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
CurrentLimit: limits[i].Limit,
LimitRemaining: overLimitThreshold - limitAfterIncrease,
}

// The limit is OK but we additionally want to know if we are near the limit.
if limitAfterIncrease > nearLimitThreshold {
// Here we also need to assess which portion of the hitsAddend were in the near limit range.
// If all the hits were over the nearLimitThreshold, then all hits are near limit. Otherwise,
// only the difference between the current limit value and the near limit threshold were near
// limit hits.
if limitBeforeIncrease >= nearLimitThreshold {
limits[i].Stats.NearLimit.Add(uint64(hitsAddend))
} else {
limits[i].Stats.NearLimit.Add(uint64(limitAfterIncrease - nearLimitThreshold))
}
}
}
}

this.wg.Add(1)
go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend))

return responseDescriptorStatuses
}

func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool, limits []*config.RateLimit, hitsAddend uint64) {
defer this.wg.Done()
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || isOverLimitWithLocalCache[i] {
continue
}

_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err == memcache.ErrCacheMiss {
expirationSeconds := limiter.UnitToDivider(limits[i].Limit.Unit)
if this.expirationJitterMaxSeconds > 0 {
expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds)
}

// Need to add instead of increment
err = this.client.Add(&memcache.Item{
Key: cacheKey.Key,
Value: []byte(strconv.FormatUint(hitsAddend, 10)),
Expiration: int32(expirationSeconds),
})
if err == memcache.ErrNotStored {
// There was a race condition to do this add. We should be able to increment
// now instead.
_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err != nil {
logger.Errorf("Failed to increment key %s after failing to add: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to add key %s: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to increment key %s: %s", cacheKey.Key, err)
continue
}
}
}

func (this *rateLimitMemcacheImpl) Flush() {
this.wg.Wait()
}

func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope) rateLimitCache {
return &rateLimitMemcacheImpl{
client: client,
timeSource: timeSource,
cacheKeyGenerator: limiter.NewCacheKeyGenerator(),
jitterRand: jitterRand,
expirationJitterMaxSeconds: expirationJitterMaxSeconds,
localCache: localCache,
}
}

func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource limiter.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) rateLimitCache {
return NewRateLimitCacheImpl(
memcache.New(s.MemcacheHostPort),
timeSource,
jitterRand,
s.ExpirationJitterMaxSeconds,
localCache,
scope,
)
}
14 changes: 14 additions & 0 deletions src/memcached/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package memcached

import (
"github.com/bradfitz/gomemcache/memcache"
)

var _ Client = (*memcache.Client)(nil)

// Interface for memcached, used for mocking.
type Client interface {
GetMulti(keys []string) (map[string]*memcache.Item, error)
Increment(key string, delta uint64) (newValue uint64, err error)
Add(item *memcache.Item) error
}
Loading

0 comments on commit 0478e41

Please sign in to comment.