Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: Unbuffered request optimization #1314

Merged
merged 2 commits into from
Jan 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions caddyhttp/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Upstream interface {
// Gets how long to wait between selecting upstream
// hosts in the case of cascading failures.
GetTryInterval() time.Duration

// Gets the number of upstream hosts.
GetHostCount() int
}

// UpstreamHostDownFunc can be used to customize how Down behaves.
Expand Down Expand Up @@ -94,13 +97,26 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// outreq is the request that makes a roundtrip to the backend
outreq := createUpstreamRequest(r)

// record and replace outreq body
body, err := newBufferedBody(outreq.Body)
if err != nil {
return http.StatusBadRequest, errors.New("failed to read downstream request body")
}
if body != nil {
outreq.Body = body
// If we have more than one upstream host defined and if retrying is enabled
// by setting try_duration to a non-zero value, caddy will try to
// retry the request at a different host if the first one failed.
//
// This requires us to possibly rewind and replay the request body though,
// which in turn requires us to buffer the request body first.
//
// An unbuffered request is usually preferrable, because it reduces latency
// as well as memory usage. Furthermore it enables different kinds of
// HTTP streaming applications like gRPC for instance.
requiresBuffering := upstream.GetHostCount() > 1 && upstream.GetTryDuration() != 0

if requiresBuffering {
body, err := newBufferedBody(outreq.Body)
if err != nil {
return http.StatusBadRequest, errors.New("failed to read downstream request body")
}
if body != nil {
outreq.Body = body
}
}

// The keepRetrying function will return true if we should
Expand Down Expand Up @@ -173,15 +189,25 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
}

// rewind request body to its beginning
if err := body.rewind(); err != nil {
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body")
// Before we retry the request we have to make sure
// that the body is rewound to it's beginning.
if bb, ok := outreq.Body.(*bufferedBody); ok {
if err := bb.rewind(); err != nil {
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body")
}
}

// tell the proxy to serve the request
atomic.AddInt64(&host.Conns, 1)
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
atomic.AddInt64(&host.Conns, -1)
//
// NOTE:
// The call to proxy.ServeHTTP can theoretically panic.
// To prevent host.Conns from getting out-of-sync we thus have to
// make sure that it's _always_ correctly decremented afterwards.
func() {
atomic.AddInt64(&host.Conns, 1)
defer atomic.AddInt64(&host.Conns, -1)
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't stop merging, but I just noticed this wrapped in a func(). Why is that? Just curious.

Copy link

@oliverpool oliverpool Jan 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably for the defer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move line 203 to right after line 204... I don't think there's a need to use defer here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably for it to look cleaner (but I'm not the author).
The increment and decrement are now spatially near one another: the rest of the function (only one line here) will then be in between (even in case of an exception or an early return)

Copy link
Author

@lhecker lhecker Jan 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because proxy.ServeHTTP can panic under certain conditions, but previously the host.Conns counter was not decremented properly if it did. That way - in my understanding - the counter was stuck and could sooner or later lead to a broken proxy state.

P.S.: For instance I fixed one possible panic in my previous PR, which could be triggered by opening a WebSocket connection to caddy and causing the upstream to disconnect right after caddy connected to it. That way a 0-length buffer is inserted into the buffer pool and every new WebSocket connection would have randomly caused panics, due to in.CopyBuffer not accepting 0-length buffers. This in turn would have incremented host.Conns forever without ever decrementing it.

And if there is one bug causing panics there must be more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, makes sense. It's a little awkward but we'll roll with it for now. Maybe place a comment explaining why it's in an anonymous func.


// if no errors, we're done here
if backendErr == nil {
Expand Down
86 changes: 86 additions & 0 deletions caddyhttp/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,90 @@ func TestReverseProxyRetry(t *testing.T) {
}
}

func TestReverseProxyLargeBody(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)

// set up proxy
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
}))
defer backend.Close()

su, err := NewStaticUpstreams(caddyfile.NewDispenser("Testfile", strings.NewReader(`proxy / `+backend.URL)))
if err != nil {
t.Fatal(err)
}

p := &Proxy{
Next: httpserver.EmptyNext, // prevents panic in some cases when test fails
Upstreams: su,
}

// middle is required to simulate closable downstream request body
middle := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err = p.ServeHTTP(w, r)
if err != nil {
t.Error(err)
}
}))
defer middle.Close()

// Our request body will be 100MB
bodySize := uint64(100 * 1000 * 1000)

// We want to see how much memory the proxy module requires for this request.
// So lets record the mem stats before we start it.
begMemstats := &runtime.MemStats{}
runtime.ReadMemStats(begMemstats)

r, err := http.NewRequest("POST", middle.URL, &noopReader{len: bodySize})
if err != nil {
t.Fatal(err)
}
resp, err := http.DefaultTransport.RoundTrip(r)
if err != nil {
t.Fatal(err)
}
resp.Body.Close()

// Finally we need the mem stats after the request is done...
endMemstats := &runtime.MemStats{}
runtime.ReadMemStats(endMemstats)

// ...to calculate the total amount of allocated memory during the request.
totalAlloc := endMemstats.TotalAlloc - begMemstats.TotalAlloc

// If that's as much as the size of the body itself it's a serious sign that the
// request was not "streamed" to the upstream without buffering it first.
if totalAlloc >= bodySize {
t.Fatalf("proxy allocated too much memory: %d bytes", totalAlloc)
}
}

type noopReader struct {
len uint64
pos uint64
}

var _ io.Reader = &noopReader{}

func (r *noopReader) Read(b []byte) (int, error) {
if r.pos >= r.len {
return 0, io.EOF
}
n := int(r.len - r.pos)
if n > len(b) {
n = len(b)
}
for i := range b[:n] {
b[i] = 0
}
r.pos += uint64(n)
return n, nil
}

func newFakeUpstream(name string, insecure bool) *fakeUpstream {
uri, _ := url.Parse(name)
u := &fakeUpstream{
Expand Down Expand Up @@ -998,6 +1082,7 @@ func (u *fakeUpstream) Select(r *http.Request) *UpstreamHost {
func (u *fakeUpstream) AllowedPath(requestPath string) bool { return true }
func (u *fakeUpstream) GetTryDuration() time.Duration { return 1 * time.Second }
func (u *fakeUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond }
func (u *fakeUpstream) GetHostCount() int { return 1 }

// newWebSocketTestProxy returns a test proxy that will
// redirect to the specified backendAddr. The function
Expand Down Expand Up @@ -1049,6 +1134,7 @@ func (u *fakeWsUpstream) Select(r *http.Request) *UpstreamHost {
func (u *fakeWsUpstream) AllowedPath(requestPath string) bool { return true }
func (u *fakeWsUpstream) GetTryDuration() time.Duration { return 1 * time.Second }
func (u *fakeWsUpstream) GetTryInterval() time.Duration { return 250 * time.Millisecond }
func (u *fakeWsUpstream) GetHostCount() int { return 1 }

// recorderHijacker is a ResponseRecorder that can
// be hijacked.
Expand Down
4 changes: 4 additions & 0 deletions caddyhttp/proxy/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ func (u *staticUpstream) GetTryInterval() time.Duration {
return u.TryInterval
}

func (u *staticUpstream) GetHostCount() int {
return len(u.Hosts)
}

// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
Expand Down