Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract prometheus backpressure operations #44

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@

See [Netflix concurrency-limits](https://github.com/Netflix/concurrency-limits) for the inspiration.

## Why Use This Project?

**Throttle Proxy** is the best solution for ensuring the safety and stability of your distributed systems during load spikes. Here's why:

- **Proven Algorithm**: Utilizes the Additive Increase/Multiplicative Decrease (AIMD) algorithm, inspired by TCP congestion control, to dynamically adjust request concurrency.
- **Real-Time Metrics**: Leverages Prometheus metrics to make real-time decisions, ensuring your system adapts quickly to changing loads.
- **Configurable and Flexible**: Allows you to set custom thresholds and monitor multiple signals, providing fine-grained control over your traffic management.
- **Prevents Failures**: Helps prevent cascading failures and maintains system stability under unpredictable load conditions.

## Key Features

- 📊 **Adaptive Traffic Management**: Automatically adjusts request concurrency based on real-time Prometheus metrics
Expand Down
44 changes: 44 additions & 0 deletions internal/util/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Package util holds custom structs and functions to handle common operations
package util

import "sync"

func MapKeys[K comparable, V any](m map[K]V) []K {
keys := make([]K, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}

// SyncMap is a typed sync.Map implementation
type SyncMap[K comparable, V comparable] struct {
mu sync.RWMutex
items map[K]V
}

// NewSyncMap creates a new typed concurrent map
func NewSyncMap[K comparable, V comparable]() *SyncMap[K, V] {
return &SyncMap[K, V]{
items: map[K]V{},
}
}

// Store sets the value for a key
func (m *SyncMap[K, V]) Store(key K, value V) {
m.mu.Lock()
defer m.mu.Unlock()
m.items[key] = value
}

// Range calls f sequentially for each key and value in the map.
// If f returns false, range stops the iteration.
func (m *SyncMap[K, V]) Range(f func(key K, value V) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.items {
if !f(k, v) {
break
}
}
}
122 changes: 26 additions & 96 deletions proxymw/backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxymw

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
Expand All @@ -15,14 +14,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"

"github.com/kevindweb/throttle-proxy/internal/util"
)

const (
BackpressureProxyType = "backpressure"
BackpressureUpdateCadence = 30 * time.Second
MonitorQueryTimeout = 15 * time.Second
DefaultThrottleCurve = 4.0
InstantQueryEndpoint = "/api/v1/query"
)

var (
Expand Down Expand Up @@ -217,7 +217,7 @@ type Backpressure struct {
monitorClient *http.Client
monitorURL string
queries []BackpressureQuery
throttleFlags sync.Map
throttleFlags *util.SyncMap[BackpressureQuery, float64]
allowance float64

client ProxyClient
Expand Down Expand Up @@ -246,6 +246,7 @@ func NewBackpressure(
warnGauge: bpQueryWarnGauge,
emergencyGauge: bpQueryEmergencyGauge,
queryValGauge: bpQueryValGauge,
throttleFlags: util.NewSyncMap[BackpressureQuery, float64](),

monitorClient: &http.Client{
Timeout: MonitorQueryTimeout,
Expand Down Expand Up @@ -287,62 +288,35 @@ func (bp *Backpressure) Next(rr Request) error {
// preventing the other signals from actioning the congestion window.
func (bp *Backpressure) metricsLoop(ctx context.Context) {
for _, q := range bp.queries {
go func(query BackpressureQuery) {
bp.metricLoop(ctx, query)
}(q)
}
}

// metricLoop pulls one PromQL metric on a loop to update whether requests should be throttled.
// we only drop the global throttle when all metrics have dropped their own throttle flag
func (bp *Backpressure) metricLoop(ctx context.Context, q BackpressureQuery) {
ticker := time.NewTicker(BackpressureUpdateCadence)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
curr, err := bp.metricFired(ctx, q.Query)
if err != nil {
bp.queryErrCount.WithLabelValues(q.Name).Inc()
log.Printf("querying metric '%s' returned error: %v", q.Query, err)
continue
go func(q BackpressureQuery) {
ticker := time.NewTicker(BackpressureUpdateCadence)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
curr, err := ValueFromPromQL(ctx, bp.monitorClient, bp.monitorURL, q.Query)
if err != nil {
bp.queryErrCount.WithLabelValues(q.Name).Inc()
log.Printf("querying metric '%s' returned error: %v", q.Query, err)
continue
}

bp.queryValGauge.WithLabelValues(q.Name).Set(curr)
bp.updateThrottle(q, curr)
}
}

bp.queryValGauge.WithLabelValues(q.Name).Set(curr)
bp.updateThrottle(q, curr)
}
}(q)
}
}

func (bp *Backpressure) updateThrottle(q BackpressureQuery, curr float64) {
bp.throttleFlags.Store(q, q.throttlePercent(curr))

throttlePercent := 0.0
var err error
bp.throttleFlags.Range(func(key, value interface{}) bool {
query, ok := key.(BackpressureQuery)
if !ok {
log.Printf(
"error updating query '%s' throttle to %f: %v, expected query got %T",
q.Query, curr, err, query,
)
return true
}

val, ok := value.(float64)
if !ok {
bp.queryErrCount.WithLabelValues(query.Name).Inc()
log.Printf(
"error updating query '%s' throttle to %f: %v, expected float got %T",
q.Query, curr, err, val,
)
return true
}

throttlePercent = max(throttlePercent, val)
bp.throttleFlags.Range(func(_ BackpressureQuery, value float64) bool {
throttlePercent = max(throttlePercent, value)
return true
})

Expand All @@ -353,50 +327,6 @@ func (bp *Backpressure) updateThrottle(q BackpressureQuery, curr float64) {
bp.mu.Unlock()
}

// queryMetric checks if the PromQL expression returns a non-empty response (backpressure is firing)
func (bp *Backpressure) metricFired(ctx context.Context, query string) (float64, error) {
u, err := url.Parse(bp.monitorURL + InstantQueryEndpoint)
if err != nil {
return 0, fmt.Errorf("parse monitor URL: %w", err)
}

q := u.Query()
q.Set("query", query)
u.RawQuery = q.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), http.NoBody)
if err != nil {
return 0, fmt.Errorf("create request: %w", err)
}

resp, err := bp.monitorClient.Do(req)
if err != nil {
return 0, fmt.Errorf("execute request: %w", err)
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

var prometheusResp PrometheusResponse
if err := json.NewDecoder(resp.Body).Decode(&prometheusResp); err != nil {
return 0, fmt.Errorf("decode response: %w", err)
}

results := prometheusResp.Data.Result
if len(results) != 1 {
return 0, fmt.Errorf("backpressure query must return exactly one value: %s", query)
}

res := float64(results[0].Value)
if res < 0 {
return 0, fmt.Errorf("backpressure query (%s) must have non-negative value: %f", query, res)
}

return res, nil
}

// check ensures the number of concurrent active requests stays within the allowed window.
// If the active count exceeds the current watermark, the request is denied.
func (bp *Backpressure) check() error {
Expand Down
Loading
Loading