Skip to content

Commit

Permalink
Handle observer cleanup more aggressively to catch leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Deems authored and kevindweb committed Dec 14, 2024
1 parent 6e74583 commit 54362f7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 4 deletions.
1 change: 1 addition & 0 deletions proxymw/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestMiddlewareOrder(t *testing.T) {
JitterDelay: time.Second,

EnableObserver: true,
ClientTimeout: time.Hour,
}

serveCalls := 0
Expand Down
6 changes: 6 additions & 0 deletions proxymw/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ type Mocker struct {
RoundTripFunc func(r *http.Request) (*http.Response, error)
InitFunc func(context.Context)
NextFunc func(Request) error
RequestFunc func() *http.Request
}

var _ http.Handler = &Mocker{}
var _ http.RoundTripper = &Mocker{}
var _ ProxyClient = &Mocker{}
var _ Request = &Mocker{}

func (m *Mocker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.ServeHTTPFunc(w, r)
Expand All @@ -32,3 +34,7 @@ func (m *Mocker) Init(ctx context.Context) {
func (m *Mocker) Next(rr Request) error {
return m.NextFunc(rr)
}

func (m *Mocker) Request() *http.Request {
return m.RequestFunc()
}
25 changes: 24 additions & 1 deletion proxymw/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package proxymw
import (
"context"
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -52,7 +53,7 @@ func (o *Observer) Init(ctx context.Context) {
func (o *Observer) Next(rr Request) error {
o.activeGauge.Inc()
start := time.Now()
err := o.client.Next(rr)
err := o.executeNext(rr)
if err != nil {
var blocked *RequestBlockedError
if errors.As(err, &blocked) {
Expand All @@ -67,3 +68,25 @@ func (o *Observer) Next(rr Request) error {
o.activeGauge.Dec()
return err
}

// executeNext runs next in a goroutine on the off chance Next hangs so we can still run cleanup
func (o *Observer) executeNext(rr Request) error {
errc := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errc <- fmt.Errorf("panic calling Next: %v", r)
}
close(errc)
}()
errc <- o.client.Next(rr)
}()

ctx := rr.Request().Context()
select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}
40 changes: 37 additions & 3 deletions proxymw/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package proxymw
import (
"context"
"errors"
"net/http"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -57,6 +58,34 @@ func TestObserverNextError(t *testing.T) {
require.Equal(t, float64(1), value)
},
},
{
name: "next panic",
observer: &Observer{
errCounter: prometheus.NewCounter(
prometheus.CounterOpts{Name: "block_test_error_count"},
),
blockCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "block_test_block_count"}, []string{"mw_type"},
),
reqCounter: prometheus.NewCounter(
prometheus.CounterOpts{Name: "block_test_request_count"},
),
latencyCounter: prometheus.NewCounter(
prometheus.CounterOpts{Name: "block_test_request_latency_ms"},
),
activeGauge: prometheus.NewGauge(
prometheus.GaugeOpts{Name: "block_test_active_requests"},
),
client: &Mocker{
NextFunc: func(_ Request) error {
panic("here")
},
InitFunc: func(_ context.Context) {},
},
},
err: "panic calling Next: here",
check: func(_ *testing.T, _ *Observer) {},
},
{
name: "normal error",
observer: &Observer{
Expand All @@ -77,7 +106,6 @@ func TestObserverNextError(t *testing.T) {
),
client: &Mocker{
NextFunc: func(r Request) error {
require.Equal(t, nil, r)
return errors.New("fail")
},
InitFunc: func(_ context.Context) {
Expand Down Expand Up @@ -141,8 +169,14 @@ func TestObserverNextError(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
tt.observer.Init(context.Background())
err := tt.observer.Next(nil)
ctx := context.Background()
tt.observer.Init(ctx)
rr := &Mocker{
RequestFunc: func() *http.Request {
return (&http.Request{}).WithContext(ctx)
},
}
err := tt.observer.Next(rr)
errStr := ""
if err != nil {
errStr = err.Error()
Expand Down
Binary file modified throttle-proxy
Binary file not shown.

0 comments on commit 54362f7

Please sign in to comment.