Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing recover for roundtrip go routines #145

Merged
merged 5 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions config/configload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,19 @@ func LoadBytes(src []byte, filename string) (*config.Couper, error) {
return nil, diags
}

return LoadConfig(hclBody, src)
return LoadConfig(hclBody, src, filename)
}

var envContext *hcl.EvalContext

func LoadConfig(body hcl.Body, src []byte) (*config.Couper, error) {
func LoadConfig(body hcl.Body, src []byte, filename string) (*config.Couper, error) {
defaults := config.DefaultSettings

couperConfig := &config.Couper{
Bytes: src,
Context: eval.NewContext(src),
Definitions: &config.Definitions{},
Filename: filename,
Settings: &defaults,
}

Expand Down Expand Up @@ -373,6 +374,8 @@ func refineEndpoints(definedBackends Backends, endpoints config.Endpoints) error
}

for _, r := range endpoint.Requests {
names[r.Name] = struct{}{}

if err := validLabelName(r.Name, &itemRange); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions config/couper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const DefaultFilename = "couper.hcl"
type Couper struct {
Bytes []byte
Context *eval.Context
Filename string
Definitions *Definitions `hcl:"definitions,block"`
Servers Servers `hcl:"server,block"`
Settings *Settings `hcl:"settings,block"`
Expand Down
2 changes: 1 addition & 1 deletion config/runtime/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func NewServerConfiguration(conf *config.Couper, log *logrus.Entry) (ServerConfi
if berr != nil {
return nil, berr
}
proxyHandler := handler.NewProxy(backend, proxyConf.HCLBody())
proxyHandler := handler.NewProxy(backend, proxyConf.HCLBody(), log)
p := &producer.Proxy{
Name: proxyConf.Name,
RoundTrip: proxyHandler,
Expand Down
2 changes: 2 additions & 0 deletions errors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
EndpointError Code = 7000 + iota
EndpointConnect
EndpointProxyConnect
EndpointProxyBodyCopyFailed
EndpointReqBodySizeExceeded
)

Expand Down Expand Up @@ -75,6 +76,7 @@ var codes = map[Code]string{
// 7xxx
EndpointConnect: "Endpoint upstream connection error",
EndpointProxyConnect: "upstream connection error via configured proxy",
EndpointProxyBodyCopyFailed: "proxy: error during body copy",
EndpointReqBodySizeExceeded: "Request body size exceeded",
}

Expand Down
4 changes: 3 additions & 1 deletion eval/reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eval

import "io"
import (
"io"
)

type ReadCloser struct {
io.Reader
Expand Down
37 changes: 27 additions & 10 deletions handler/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"net/http"
"strconv"
"strings"

"github.com/hashicorp/hcl/v2"
Expand Down Expand Up @@ -99,7 +100,18 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
clientres = result.Beresp
err = result.Err
} else {
// fallback
err = errors.Configuration

// TODO determine error priority, may solved with error_handler
// on roundtrip panic the context label is missing atm
// pick the first err from beresps
for _, br := range beresps {
if br != nil && br.Err != nil {
err = br.Err
break
}
}
}
}

Expand All @@ -111,6 +123,9 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if p, ok := req.Context().Value(request.RoundTripProxy).(bool); ok && p {
serveErr = errors.EndpointProxyConnect
}
case producer.ResultPanic:
serveErr = errors.Server
e.log.WithField("uid", req.Context().Value(request.UID)).Error(err)
}
e.opts.Error.ServeError(serveErr).ServeHTTP(rw, req)
return
Expand Down Expand Up @@ -187,25 +202,27 @@ func (e *Endpoint) newRedirect() *http.Response {
}

func (e *Endpoint) readResults(requestResults producer.Results, beresps producer.ResultMap) {
i := 0
for r := range requestResults { // collect resps
i++
if r == nil {
panic("implement nil result handling")
continue
}

if r.Beresp != nil {
var name string

if r.Beresp != nil && r.Beresp.Request != nil {
ctx := r.Beresp.Request.Context()
var name string

if n, ok := ctx.Value(request.RoundTripName).(string); ok && n != "" {
name = n
}
// fallback
if name == "" {
if id, ok := ctx.Value(request.UID).(string); ok {
name = id
}
}
beresps[name] = r
}
// fallback
if name == "" { // error case or panic
name = strconv.Itoa(i)
}
beresps[name] = r
}
}

Expand Down
29 changes: 28 additions & 1 deletion handler/producer/result.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package producer

import (
"fmt"
"net/http"
"runtime/debug"
"sync"

"github.com/avenga/couper/errors"
)

// Result represents the producer <Result> object.
Expand All @@ -12,6 +16,15 @@ type Result struct {
// TODO: trace
}

type ResultPanic struct {
err error
stack []byte
}

func (r ResultPanic) Error() string {
return fmt.Sprintf("panic: %v\n%s", r.err, string(r.stack))
}

// Results represents the producer <Result> channel.
type Results chan *Result

Expand All @@ -26,7 +39,21 @@ func (rm ResultMap) List() []*http.Response {
}

func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
if rp := recover(); rp != nil {
var err error
if rp == http.ErrAbortHandler {
err = errors.EndpointProxyBodyCopyFailed
} else {
err = &ResultPanic{
err: fmt.Errorf("%v", rp),
stack: debug.Stack(),
}
}
results <- &Result{Err: err}
}
wg.Done()
}()

// TODO: apply evals here with context?
beresp, err := rt.RoundTrip(req)
Expand Down
21 changes: 18 additions & 3 deletions handler/proxy.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package handler

import (
"log"
"net/http"
"net/http/httputil"
"strings"

"github.com/hashicorp/hcl/v2"
"github.com/sirupsen/logrus"

"github.com/avenga/couper/eval"
"github.com/avenga/couper/handler/transport"
Expand All @@ -22,19 +25,20 @@ type Proxy struct {
reverseProxy *httputil.ReverseProxy
}

func NewProxy(backend http.RoundTripper, ctx hcl.Body) *Proxy {
func NewProxy(backend http.RoundTripper, ctx hcl.Body, logger *logrus.Entry) *Proxy {
proxy := &Proxy{
backend: backend,
context: ctx,
}
rp := &httputil.ReverseProxy{
Director: proxy.director,
Transport: backend,
Director: proxy.director,
ErrorHandler: func(rw http.ResponseWriter, _ *http.Request, err error) {
if rec, ok := rw.(*transport.Recorder); ok {
rec.SetError(err)
}
},
ErrorLog: newErrorLogWrapper(logger),
Transport: backend,
}
proxy.reverseProxy = rp
return proxy
Expand All @@ -60,3 +64,14 @@ func (p *Proxy) director(req *http.Request) {
req.Header.Del(key)
}
}

// ErrorWrapper logs httputil.ReverseProxy internals with our own logrus.Entry.
type ErrorWrapper struct{ l logrus.FieldLogger }

func (e *ErrorWrapper) Write(p []byte) (n int, err error) {
e.l.Error(strings.Replace(string(p), "\n", "", 1))
return len(p), nil
}
func newErrorLogWrapper(logger logrus.FieldLogger) *log.Logger {
return log.New(&ErrorWrapper{logger}, "", log.Lshortfile)
}
41 changes: 35 additions & 6 deletions handler/transport/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"compress/gzip"
"context"
"encoding/base64"
"fmt"
"net/http"
"net/url"
"regexp"
"strings"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/sirupsen/logrus"
Expand All @@ -31,6 +33,8 @@ const (

var _ http.RoundTripper = &Backend{}

var BackendDeadlineExceeded = fmt.Errorf("backend deadline exceeded")

var ReClientSupportsGZ = regexp.MustCompile(`(?i)\b` + GzipName + `\b`)

// Backend represents the transport configuration.
Expand Down Expand Up @@ -73,11 +77,7 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
tc := b.evalTransport(req)
t := Get(tc)

if b.transportConf.Timeout > 0 {
deadline, cancel := context.WithTimeout(req.Context(), b.transportConf.Timeout)
defer cancel()
*req = *req.WithContext(deadline)
}
deadlineErr := b.withTimeout(req)

if req.URL.Scheme == "" {
req.URL.Scheme = tc.Scheme
Expand Down Expand Up @@ -122,7 +122,14 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
req.Close = false
beresp, err := t.RoundTrip(req)
if err != nil {
return nil, err
select {
case derr := <-deadlineErr:
if derr != nil {
return nil, derr
}
default:
return nil, err
}
}

if b.openAPIValidator != nil {
Expand Down Expand Up @@ -156,6 +163,28 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
return beresp, err
}

func (b *Backend) withTimeout(req *http.Request) <-chan error {
errCh := make(chan error, 1)
if b.transportConf.Timeout <= 0 {
return errCh
}

ctx, cancel := context.WithCancel(req.Context())
*req = *req.WithContext(ctx)
go func(cancelFn func(), c context.Context, ec chan error) {
defer cancelFn()
deadline := time.After(b.transportConf.Timeout)
select {
case <-deadline:
ec <- BackendDeadlineExceeded
return
case <-c.Done():
return
}
}(cancel, ctx, errCh)
return errCh
}

func (b *Backend) evalTransport(req *http.Request) *Config {
var httpContext *hcl.EvalContext
if httpCtx, ok := req.Context().Value(eval.ContextType).(*eval.Context); ok {
Expand Down
2 changes: 1 addition & 1 deletion handler/transport/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestBackend_RoundTrip_Timings(t *testing.T) {
expectedErr string
}{
{"with zero timings", test.NewRemainContext("origin", origin.URL), &transport.Config{}, httptest.NewRequest(http.MethodGet, "http://1.2.3.4/", nil), ""},
{"with overall timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{Timeout: time.Second / 2, ConnectTimeout: time.Minute}, httptest.NewRequest(http.MethodHead, "http://1.2.3.5/", nil), "context deadline exceeded"},
{"with overall timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{Timeout: time.Second / 2, ConnectTimeout: time.Minute}, httptest.NewRequest(http.MethodHead, "http://1.2.3.5/", nil), "backend deadline exceeded"},
{"with connect timeout", test.NewRemainContext("origin", "http://blackhole.webpagetest.org"), &transport.Config{ConnectTimeout: time.Second / 2}, httptest.NewRequest(http.MethodGet, "http://1.2.3.6/", nil), "i/o timeout"},
{"with ttfb timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{TTFBTimeout: time.Second}, httptest.NewRequest(http.MethodHead, "http://1.2.3.7/", nil), "timeout awaiting response headers"},
}
Expand Down
6 changes: 3 additions & 3 deletions internal/test/helper_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func (h *Helper) NewProxy(conf *transport.Config, backendContext, proxyContext h
if proxyCtx == nil {
proxyCtx = hcl.EmptyBody()
}
log := logger.WithContext(context.Background())
backend := transport.NewBackend(backendContext, config, nil, log)

backend := transport.NewBackend(backendContext, config, nil, logger.WithContext(context.Background()))

proxy := handler.NewProxy(backend, proxyCtx)
proxy := handler.NewProxy(backend, proxyCtx, log)
return proxy
}

Expand Down
Loading