From 532a4aba7fbbfecb6c38436eb5dd349597992b59 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 30 Jan 2017 18:34:15 +0100 Subject: [PATCH 1/2] Vendor golang.org/x/time/rate --- vendor/golang.org/x/time/rate/LICENSE | 27 ++ vendor/golang.org/x/time/rate/rate.go | 371 ++++++++++++++++++++++++++ vendor/manifest | 9 + 3 files changed, 407 insertions(+) create mode 100644 vendor/golang.org/x/time/rate/LICENSE create mode 100644 vendor/golang.org/x/time/rate/rate.go diff --git a/vendor/golang.org/x/time/rate/LICENSE b/vendor/golang.org/x/time/rate/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/vendor/golang.org/x/time/rate/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 0000000000..938feaffe9 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,371 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" + + "golang.org/x/net/context" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/manifest b/vendor/manifest index 652c3f3264..47aefb538b 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -975,6 +975,15 @@ "path": "unicode/rangetable", "notests": true }, + { + "importpath": "golang.org/x/time/rate", + "repository": "https://go.googlesource.com/time", + "vcs": "git", + "revision": "f51c12702a4d776e4c1fa9b0fabab841babae631", + "branch": "master", + "path": "/rate", + "notests": true + }, { "importpath": "google.golang.org/api/compute/v1", "repository": "https://code.googlesource.com/google-api-go-client", From 230b7717926b3ba4f984a66ca06b437dd7f26aba Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 30 Jan 2017 18:34:19 +0100 Subject: [PATCH 2/2] Implement per-user rate limits This is the simplest version for now, just based on flag-configured rate limits per distributor process and no knowledge about total (cross-distributor rates) yet. Fixes https://github.com/weaveworks/cortex/issues/128 --- distributor/distributor.go | 43 ++++++++++++++++++++++++++++++++------ distributor/http_server.go | 7 ++++++- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/distributor/distributor.go b/distributor/distributor.go index 03bae0a294..d3ac65ebbe 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -1,6 +1,7 @@ package distributor import ( + "errors" "flag" "fmt" "hash/fnv" @@ -12,6 +13,7 @@ import ( "github.com/mwitkow/go-grpc-middleware" "github.com/opentracing/opentracing-go" "golang.org/x/net/context" + "golang.org/x/time/rate" "google.golang.org/grpc" "github.com/prometheus/client_golang/prometheus" @@ -28,6 +30,8 @@ import ( "github.com/weaveworks/cortex/util" ) +var errIngestionRateLimitExceeded = errors.New("ingestion rate limit exceeded") + var ( numClientsDesc = prometheus.NewDesc( "cortex_distributor_ingester_clients", @@ -46,6 +50,10 @@ type Distributor struct { quit chan struct{} done chan struct{} + // Per-user rate limiters. + ingestLimitersMtx sync.Mutex + ingestLimiters map[string]*rate.Limiter + queryDuration *prometheus.HistogramVec receivedSamples prometheus.Counter sendDuration *prometheus.HistogramVec @@ -77,6 +85,8 @@ type Config struct { HeartbeatTimeout time.Duration RemoteTimeout time.Duration ClientCleanupPeriod time.Duration + IngestionRateLimit float64 + IngestionBurstSize int } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -86,6 +96,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.DurationVar(&cfg.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") + flag.Float64Var(&cfg.IngestionRateLimit, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") + flag.IntVar(&cfg.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") } // New constructs a new Distributor @@ -97,11 +109,12 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) { return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor) } d := &Distributor{ - cfg: cfg, - ring: ring, - clients: map[string]ingesterClient{}, - quit: make(chan struct{}), - done: make(chan struct{}), + cfg: cfg, + ring: ring, + clients: map[string]ingesterClient{}, + quit: make(chan struct{}), + done: make(chan struct{}), + ingestLimiters: map[string]*rate.Limiter{}, queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "distributor_query_duration_seconds", @@ -255,6 +268,11 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort samples := util.FromWriteRequest(req) d.receivedSamples.Add(float64(len(samples))) + limiter := d.getOrCreateIngestLimiter(userID) + if !limiter.AllowN(time.Now(), len(samples)) { + return nil, errIngestionRateLimitExceeded + } + keys := make([]uint32, len(samples), len(samples)) for i, sample := range samples { keys[i] = tokenForMetric(userID, sample.Metric) @@ -286,7 +304,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort } // This is just a shortcut - if there are not minSuccess available ingesters, - // after filtering out dead ones, don't even both trying. + // after filtering out dead ones, don't even bother trying. if len(liveIngesters) < sampleTrackers[i].minSuccess { return nil, fmt.Errorf("wanted at least %d live ingesters to process write, had %d", sampleTrackers[i].minSuccess, len(liveIngesters)) @@ -320,6 +338,19 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort return &cortex.WriteResponse{}, nil } +func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter { + d.ingestLimitersMtx.Lock() + defer d.ingestLimitersMtx.Unlock() + + if limiter, ok := d.ingestLimiters[userID]; ok { + return limiter + } + + limiter := rate.NewLimiter(rate.Limit(d.cfg.IngestionRateLimit), d.cfg.IngestionBurstSize) + d.ingestLimiters[userID] = limiter + return limiter +} + func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, sampleTrackers []*sampleTracker) error { client, err := d.getClientFor(ingester) if err != nil { diff --git a/distributor/http_server.go b/distributor/http_server.go index 2aac34a533..32c2f704d6 100644 --- a/distributor/http_server.go +++ b/distributor/http_server.go @@ -21,8 +21,13 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { _, err := d.Push(ctx, &req) if err != nil { + switch err { + case errIngestionRateLimitExceeded: + http.Error(w, err.Error(), http.StatusTooManyRequests) + default: + http.Error(w, err.Error(), http.StatusInternalServerError) + } log.Errorf("append err: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) } }