Skip to content

Commit

Permalink
proxy: Added unbuffered request optimization
Browse files Browse the repository at this point in the history
If only one upstream is defined we don't need to buffer the body.
Instead we directly stream the body to the upstream host,
which reduces memory usage as well as latency.
Furthermore this enables different kinds of HTTP streaming
applications like gRPC for instance.
  • Loading branch information
lhecker committed Jan 11, 2017
1 parent fab3b5b commit 8048e9c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
52 changes: 39 additions & 13 deletions caddyhttp/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type Upstream interface {
// Gets how long to wait between selecting upstream
// hosts in the case of cascading failures.
GetTryInterval() time.Duration

// Gets the number of upstream hosts.
GetHostCount() int
}

// UpstreamHostDownFunc can be used to customize how Down behaves.
Expand Down Expand Up @@ -94,13 +97,26 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
// outreq is the request that makes a roundtrip to the backend
outreq := createUpstreamRequest(r)

// record and replace outreq body
body, err := newBufferedBody(outreq.Body)
if err != nil {
return http.StatusBadRequest, errors.New("failed to read downstream request body")
}
if body != nil {
outreq.Body = body
// If we have more than one upstream host defined and if retrying is enabled
// by setting try_duration to a non-zero value, caddy will try to
// retry the request at a different host if the first one failed.
//
// This requires us to possibly rewind and replay the request body though,
// which in turn requires us to buffer the request body first.
//
// An unbuffered request is usually preferrable, because it reduces latency
// as well as memory usage. Furthermore it enables different kinds of
// HTTP streaming applications like gRPC for instance.
requiresBuffering := upstream.GetHostCount() > 1 && upstream.GetTryDuration() != 0

if requiresBuffering {
body, err := newBufferedBody(outreq.Body)
if err != nil {
return http.StatusBadRequest, errors.New("failed to read downstream request body")
}
if body != nil {
outreq.Body = body
}
}

// The keepRetrying function will return true if we should
Expand Down Expand Up @@ -173,15 +189,25 @@ func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
downHeaderUpdateFn = createRespHeaderUpdateFn(host.DownstreamHeaders, replacer)
}

// rewind request body to its beginning
if err := body.rewind(); err != nil {
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body")
// Before we retry the request we have to make sure
// that the body is rewound to it's beginning.
if bb, ok := outreq.Body.(*bufferedBody); ok {
if err := bb.rewind(); err != nil {
return http.StatusInternalServerError, errors.New("unable to rewind downstream request body")
}
}

// tell the proxy to serve the request
atomic.AddInt64(&host.Conns, 1)
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
atomic.AddInt64(&host.Conns, -1)
//
// NOTE:
// The call to proxy.ServeHTTP can theoretically panic.
// To prevent host.Conns from getting out-of-sync we thus have to
// make sure that it's _always_ correctly decremented afterwards.
func() {
atomic.AddInt64(&host.Conns, 1)
defer atomic.AddInt64(&host.Conns, -1)
backendErr = proxy.ServeHTTP(w, outreq, downHeaderUpdateFn)
}()

// if no errors, we're done here
if backendErr == nil {
Expand Down
4 changes: 4 additions & 0 deletions caddyhttp/proxy/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ func (u *staticUpstream) GetTryInterval() time.Duration {
return u.TryInterval
}

func (u *staticUpstream) GetHostCount() int {
return len(u.Hosts)
}

// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
Expand Down

0 comments on commit 8048e9c

Please sign in to comment.