From 862f934ec5abfb5ea4c4e1f956da6b59436fac86 Mon Sep 17 00:00:00 2001 From: Marcel Ludwig Date: Fri, 12 Mar 2021 17:21:50 +0100 Subject: [PATCH 1/5] Fix missing recover for roundtrip go routines --- config/configload/load.go | 5 ++-- config/couper.go | 1 + handler/endpoint.go | 37 +++++++++++++++++++-------- handler/producer/result.go | 21 +++++++++++++++- server/http_integration_test.go | 19 +++++++++++--- server/http_test.go | 44 +++++++++++++++++++++++++++++++++ 6 files changed, 110 insertions(+), 17 deletions(-) diff --git a/config/configload/load.go b/config/configload/load.go index 44afcc4df..bbe857f7b 100644 --- a/config/configload/load.go +++ b/config/configload/load.go @@ -56,18 +56,19 @@ func LoadBytes(src []byte, filename string) (*config.Couper, error) { return nil, diags } - return LoadConfig(hclBody, src) + return LoadConfig(hclBody, src, filename) } var envContext *hcl.EvalContext -func LoadConfig(body hcl.Body, src []byte) (*config.Couper, error) { +func LoadConfig(body hcl.Body, src []byte, filename string) (*config.Couper, error) { defaults := config.DefaultSettings couperConfig := &config.Couper{ Bytes: src, Context: eval.NewContext(src), Definitions: &config.Definitions{}, + Filename: filename, Settings: &defaults, } diff --git a/config/couper.go b/config/couper.go index 895a3c0a9..c298d4f02 100644 --- a/config/couper.go +++ b/config/couper.go @@ -11,6 +11,7 @@ const DefaultFilename = "couper.hcl" type Couper struct { Bytes []byte Context *eval.Context + Filename string Definitions *Definitions `hcl:"definitions,block"` Servers Servers `hcl:"server,block"` Settings *Settings `hcl:"settings,block"` diff --git a/handler/endpoint.go b/handler/endpoint.go index 7777df9e7..134364071 100644 --- a/handler/endpoint.go +++ b/handler/endpoint.go @@ -4,6 +4,7 @@ import ( "context" "net" "net/http" + "strconv" "strings" "github.com/hashicorp/hcl/v2" @@ -99,7 +100,18 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) { clientres = result.Beresp err = result.Err } else { + // fallback err = errors.Configuration + + // TODO determine error priority, may solved with error_handler + // on roundtrip panic the context label is missing atm + // pick the first err from beresps + for _, br := range beresps { + if br != nil && br.Err != nil { + err = br.Err + break + } + } } } @@ -111,6 +123,9 @@ func (e *Endpoint) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if p, ok := req.Context().Value(request.RoundTripProxy).(bool); ok && p { serveErr = errors.EndpointProxyConnect } + case producer.ResultPanic: + serveErr = errors.Server + e.log.WithField("uid", req.Context().Value(request.UID)).Error(err) } e.opts.Error.ServeError(serveErr).ServeHTTP(rw, req) return @@ -187,25 +202,27 @@ func (e *Endpoint) newRedirect() *http.Response { } func (e *Endpoint) readResults(requestResults producer.Results, beresps producer.ResultMap) { + i := 0 for r := range requestResults { // collect resps + i++ if r == nil { - panic("implement nil result handling") + continue } - if r.Beresp != nil { + var name string + + if r.Beresp != nil && r.Beresp.Request != nil { ctx := r.Beresp.Request.Context() - var name string + if n, ok := ctx.Value(request.RoundTripName).(string); ok && n != "" { name = n } - // fallback - if name == "" { - if id, ok := ctx.Value(request.UID).(string); ok { - name = id - } - } - beresps[name] = r } + // fallback + if name == "" { // error case or panic + name = strconv.Itoa(i) + } + beresps[name] = r } } diff --git a/handler/producer/result.go b/handler/producer/result.go index 4d6d0c961..8b23e9aac 100644 --- a/handler/producer/result.go +++ b/handler/producer/result.go @@ -1,7 +1,9 @@ package producer import ( + "fmt" "net/http" + "runtime/debug" "sync" ) @@ -12,6 +14,15 @@ type Result struct { // TODO: trace } +type ResultPanic struct { + err error + stack []byte +} + +func (r ResultPanic) Error() string { + return fmt.Sprintf("panic: %v\n%s", r.err, string(r.stack)) +} + // Results represents the producer channel. type Results chan *Result @@ -26,7 +37,15 @@ func (rm ResultMap) List() []*http.Response { } func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result, wg *sync.WaitGroup) { - defer wg.Done() + defer func() { + if rp := recover(); rp != nil { + results <- &Result{Err: ResultPanic{ + err: fmt.Errorf("%v", rp), + stack: debug.Stack(), + }} + } + wg.Done() + }() // TODO: apply evals here with context? beresp, err := rt.RoundTrip(req) diff --git a/server/http_integration_test.go b/server/http_integration_test.go index 2a720849a..9b8b353bb 100644 --- a/server/http_integration_test.go +++ b/server/http_integration_test.go @@ -21,12 +21,12 @@ import ( "testing" "time" - "github.com/avenga/couper/config/configload" - "github.com/sirupsen/logrus" logrustest "github.com/sirupsen/logrus/hooks/test" "github.com/avenga/couper/command" + "github.com/avenga/couper/config" + "github.com/avenga/couper/config/configload" "github.com/avenga/couper/internal/test" ) @@ -72,11 +72,22 @@ func teardown() { } testBackend.Close() } - func newCouper(file string, helper *test.Helper) (func(), *logrustest.Hook) { couperConfig, err := configload.LoadFile(filepath.Join(testWorkingDir, file)) helper.Must(err) + return newCouperWithConfig(couperConfig, helper) +} + +func newCouperWithBytes(file []byte, helper *test.Helper) (func(), *logrustest.Hook) { + couperConfig, err := configload.LoadBytes(file, "couper-bytes.hcl") + helper.Must(err) + + return newCouperWithConfig(couperConfig, helper) +} + +func newCouperWithConfig(couperConfig *config.Couper, helper *test.Helper) (func(), *logrustest.Hook) { + log, hook := logrustest.NewNullLogger() ctx, cancel := context.WithCancel(context.Background()) @@ -108,7 +119,7 @@ func newCouper(file string, helper *test.Helper) (func(), *logrustest.Hook) { //log.Out = os.Stdout go func() { - if err := command.NewRun(ctx).Execute([]string{file}, couperConfig, log.WithContext(ctx)); err != nil { + if err := command.NewRun(ctx).Execute([]string{couperConfig.Filename}, couperConfig, log.WithContext(ctx)); err != nil { shutdownFn() panic(err) } diff --git a/server/http_test.go b/server/http_test.go index 72764e7b9..8826ecf5a 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -2,6 +2,7 @@ package server_test import ( "bytes" + "compress/gzip" "context" "fmt" "io/ioutil" @@ -274,3 +275,46 @@ func TestHTTPServer_ServeHTTP_UUID_Option(t *testing.T) { }) } } + +func TestHTTPServer_ServeProxyAbortHandler(t *testing.T) { + configFile := ` +server "zipzip" { + endpoint "/**" { + proxy { + backend { + origin = "%s" + } + } + } +} +` + origin := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + //rw.Write([]byte(configFile)) + //return + rw.Header().Set("Content-Encoding", "gzip") + _, err := gzip.NewWriter(rw).Write([]byte(configFile)) + if err != nil { + t.Error(err) + } + })) + + helper := test.New(t) + + shutdown, loghook := newCouperWithBytes([]byte(fmt.Sprintf(configFile, origin.URL)), helper) + defer shutdown() + + req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil) + helper.Must(err) + + //req.Header.Set("Accept-Encoding", "gzip, br") + + res, err := newClient().Do(req) + helper.Must(err) + + if res.StatusCode != http.StatusOK { + t.Errorf("Expected OK, got: %s", res.Status) + for _, entry := range loghook.AllEntries() { + t.Log(entry.String()) + } + } +} From b86e46cd2e7602b4b0e11f064502f095fce5c461 Mon Sep 17 00:00:00 2001 From: Marcel Ludwig Date: Sat, 13 Mar 2021 08:40:03 +0100 Subject: [PATCH 2/5] Finalize test and catch reverseproxy logf calls --- config/runtime/server.go | 2 +- handler/proxy.go | 21 ++++++++++++--- internal/test/helper_proxy.go | 6 ++--- server/http_test.go | 49 ++++++++++++++++++++++++++++------- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/config/runtime/server.go b/config/runtime/server.go index 84ab8626b..1c4ab0940 100644 --- a/config/runtime/server.go +++ b/config/runtime/server.go @@ -186,7 +186,7 @@ func NewServerConfiguration(conf *config.Couper, log *logrus.Entry) (ServerConfi if berr != nil { return nil, berr } - proxyHandler := handler.NewProxy(backend, proxyConf.HCLBody()) + proxyHandler := handler.NewProxy(backend, proxyConf.HCLBody(), log) p := &producer.Proxy{ Name: proxyConf.Name, RoundTrip: proxyHandler, diff --git a/handler/proxy.go b/handler/proxy.go index a1f8734a4..e36edc516 100644 --- a/handler/proxy.go +++ b/handler/proxy.go @@ -1,10 +1,13 @@ package handler import ( + "log" "net/http" "net/http/httputil" + "strings" "github.com/hashicorp/hcl/v2" + "github.com/sirupsen/logrus" "github.com/avenga/couper/eval" "github.com/avenga/couper/handler/transport" @@ -22,19 +25,20 @@ type Proxy struct { reverseProxy *httputil.ReverseProxy } -func NewProxy(backend http.RoundTripper, ctx hcl.Body) *Proxy { +func NewProxy(backend http.RoundTripper, ctx hcl.Body, logger *logrus.Entry) *Proxy { proxy := &Proxy{ backend: backend, context: ctx, } rp := &httputil.ReverseProxy{ - Director: proxy.director, - Transport: backend, + Director: proxy.director, ErrorHandler: func(rw http.ResponseWriter, _ *http.Request, err error) { if rec, ok := rw.(*transport.Recorder); ok { rec.SetError(err) } }, + ErrorLog: newErrorLogWrapper(logger), + Transport: backend, } proxy.reverseProxy = rp return proxy @@ -60,3 +64,14 @@ func (p *Proxy) director(req *http.Request) { req.Header.Del(key) } } + +// ErrorWrapper logs httputil.ReverseProxy internals with our own logrus.Entry. +type ErrorWrapper struct{ l logrus.FieldLogger } + +func (e *ErrorWrapper) Write(p []byte) (n int, err error) { + e.l.Error(strings.Replace(string(p), "\n", "", 1)) + return len(p), nil +} +func newErrorLogWrapper(logger logrus.FieldLogger) *log.Logger { + return log.New(&ErrorWrapper{logger}, "", log.Lshortfile) +} diff --git a/internal/test/helper_proxy.go b/internal/test/helper_proxy.go index 0bb5f1108..ff7c965ec 100644 --- a/internal/test/helper_proxy.go +++ b/internal/test/helper_proxy.go @@ -29,10 +29,10 @@ func (h *Helper) NewProxy(conf *transport.Config, backendContext, proxyContext h if proxyCtx == nil { proxyCtx = hcl.EmptyBody() } + log := logger.WithContext(context.Background()) + backend := transport.NewBackend(backendContext, config, nil, log) - backend := transport.NewBackend(backendContext, config, nil, logger.WithContext(context.Background())) - - proxy := handler.NewProxy(backend, proxyCtx) + proxy := handler.NewProxy(backend, proxyCtx, log) return proxy } diff --git a/server/http_test.go b/server/http_test.go index 8826ecf5a..d71a8f1de 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -288,17 +288,26 @@ server "zipzip" { } } ` + helper := test.New(t) + origin := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - //rw.Write([]byte(configFile)) - //return rw.Header().Set("Content-Encoding", "gzip") - _, err := gzip.NewWriter(rw).Write([]byte(configFile)) - if err != nil { - t.Error(err) - } - })) + gzw := gzip.NewWriter(rw) + defer func() { + if r.Header.Get("x-close") != "" { + return // triggers reverseproxy copyBuffer panic due to missing gzip footer + } + if e := gzw.Close(); e != nil { + t.Error(e) + } + }() - helper := test.New(t) + _, err := gzw.Write([]byte(configFile)) + helper.Must(err) + + err = gzw.Flush() // explicit flush, just the gzip footer is missing + helper.Must(err) + })) shutdown, loghook := newCouperWithBytes([]byte(fmt.Sprintf(configFile, origin.URL)), helper) defer shutdown() @@ -306,8 +315,6 @@ server "zipzip" { req, err := http.NewRequest(http.MethodGet, "http://localhost:8080", nil) helper.Must(err) - //req.Header.Set("Accept-Encoding", "gzip, br") - res, err := newClient().Do(req) helper.Must(err) @@ -317,4 +324,26 @@ server "zipzip" { t.Log(entry.String()) } } + + b, err := ioutil.ReadAll(res.Body) + helper.Must(err) + helper.Must(res.Body.Close()) + + if string(b) != configFile { + t.Error("Expected same content") + } + + loghook.Reset() + + // Trigger panic + req.Header.Set("x-close", "dont") + res, err = newClient().Do(req) + helper.Must(err) + + if res.StatusCode != http.StatusInternalServerError { + t.Errorf("Expected status %d, got: %d", http.StatusInternalServerError, res.StatusCode) + for _, entry := range loghook.AllEntries() { + t.Log(entry.String()) + } + } } From fe9f20e5a1f40bdb244f60a21697d6bcd8babdb9 Mon Sep 17 00:00:00 2001 From: Marcel Ludwig Date: Mon, 15 Mar 2021 07:48:00 +0100 Subject: [PATCH 3/5] Add more specific error for proxy copy --- errors/code.go | 2 ++ handler/producer/result.go | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/errors/code.go b/errors/code.go index 3f25fb033..74c0b1659 100644 --- a/errors/code.go +++ b/errors/code.go @@ -43,6 +43,7 @@ const ( EndpointError Code = 7000 + iota EndpointConnect EndpointProxyConnect + EndpointProxyBodyCopyFailed EndpointReqBodySizeExceeded ) @@ -75,6 +76,7 @@ var codes = map[Code]string{ // 7xxx EndpointConnect: "Endpoint upstream connection error", EndpointProxyConnect: "upstream connection error via configured proxy", + EndpointProxyBodyCopyFailed: "proxy: error during body copy", EndpointReqBodySizeExceeded: "Request body size exceeded", } diff --git a/handler/producer/result.go b/handler/producer/result.go index 8b23e9aac..4b3285f7b 100644 --- a/handler/producer/result.go +++ b/handler/producer/result.go @@ -5,6 +5,8 @@ import ( "net/http" "runtime/debug" "sync" + + "github.com/avenga/couper/errors" ) // Result represents the producer object. @@ -39,10 +41,16 @@ func (rm ResultMap) List() []*http.Response { func roundtrip(rt http.RoundTripper, req *http.Request, results chan<- *Result, wg *sync.WaitGroup) { defer func() { if rp := recover(); rp != nil { - results <- &Result{Err: ResultPanic{ - err: fmt.Errorf("%v", rp), - stack: debug.Stack(), - }} + var err error + if rp == http.ErrAbortHandler { + err = errors.EndpointProxyBodyCopyFailed + } else { + err = &ResultPanic{ + err: fmt.Errorf("%v", rp), + stack: debug.Stack(), + } + } + results <- &Result{Err: err} } wg.Done() }() From 04223139a798ace67818702464bf436ca9ee5475 Mon Sep 17 00:00:00 2001 From: Marcel Ludwig Date: Mon, 15 Mar 2021 09:03:00 +0100 Subject: [PATCH 4/5] Fix backend overall timeout behaviour deadline cancel handling with defer was just wrong --- eval/reader.go | 4 ++- handler/transport/backend.go | 41 ++++++++++++++++++++++++++----- handler/transport/backend_test.go | 2 +- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/eval/reader.go b/eval/reader.go index 5ab97adbd..5e4dc8369 100644 --- a/eval/reader.go +++ b/eval/reader.go @@ -1,6 +1,8 @@ package eval -import "io" +import ( + "io" +) type ReadCloser struct { io.Reader diff --git a/handler/transport/backend.go b/handler/transport/backend.go index 5448040bd..3e5cd3876 100644 --- a/handler/transport/backend.go +++ b/handler/transport/backend.go @@ -4,10 +4,12 @@ import ( "compress/gzip" "context" "encoding/base64" + "fmt" "net/http" "net/url" "regexp" "strings" + "time" "github.com/hashicorp/hcl/v2" "github.com/sirupsen/logrus" @@ -31,6 +33,8 @@ const ( var _ http.RoundTripper = &Backend{} +var BackendDeadlineExceeded = fmt.Errorf("backend deadline exceeded") + var ReClientSupportsGZ = regexp.MustCompile(`(?i)\b` + GzipName + `\b`) // Backend represents the transport configuration. @@ -73,11 +77,7 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) { tc := b.evalTransport(req) t := Get(tc) - if b.transportConf.Timeout > 0 { - deadline, cancel := context.WithTimeout(req.Context(), b.transportConf.Timeout) - defer cancel() - *req = *req.WithContext(deadline) - } + deadlineErr := b.withTimeout(req) if req.URL.Scheme == "" { req.URL.Scheme = tc.Scheme @@ -122,7 +122,14 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) { req.Close = false beresp, err := t.RoundTrip(req) if err != nil { - return nil, err + select { + case derr := <-deadlineErr: + if derr != nil { + return nil, derr + } + default: + return nil, err + } } if b.openAPIValidator != nil { @@ -156,6 +163,28 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) { return beresp, err } +func (b *Backend) withTimeout(req *http.Request) <-chan error { + errCh := make(chan error, 1) + if b.transportConf.Timeout <= 0 { + return errCh + } + + ctx, cancel := context.WithCancel(req.Context()) + *req = *req.WithContext(ctx) + go func(cancelFn func(), c context.Context, ec chan error) { + defer cancelFn() + deadline := time.After(b.transportConf.Timeout) + select { + case <-deadline: + ec <- BackendDeadlineExceeded + return + case <-c.Done(): + return + } + }(cancel, ctx, errCh) + return errCh +} + func (b *Backend) evalTransport(req *http.Request) *Config { var httpContext *hcl.EvalContext if httpCtx, ok := req.Context().Value(eval.ContextType).(*eval.Context); ok { diff --git a/handler/transport/backend_test.go b/handler/transport/backend_test.go index c41aface8..607d8801d 100644 --- a/handler/transport/backend_test.go +++ b/handler/transport/backend_test.go @@ -46,7 +46,7 @@ func TestBackend_RoundTrip_Timings(t *testing.T) { expectedErr string }{ {"with zero timings", test.NewRemainContext("origin", origin.URL), &transport.Config{}, httptest.NewRequest(http.MethodGet, "http://1.2.3.4/", nil), ""}, - {"with overall timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{Timeout: time.Second / 2, ConnectTimeout: time.Minute}, httptest.NewRequest(http.MethodHead, "http://1.2.3.5/", nil), "context deadline exceeded"}, + {"with overall timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{Timeout: time.Second / 2, ConnectTimeout: time.Minute}, httptest.NewRequest(http.MethodHead, "http://1.2.3.5/", nil), "backend deadline exceeded"}, {"with connect timeout", test.NewRemainContext("origin", "http://blackhole.webpagetest.org"), &transport.Config{ConnectTimeout: time.Second / 2}, httptest.NewRequest(http.MethodGet, "http://1.2.3.6/", nil), "i/o timeout"}, {"with ttfb timeout", test.NewRemainContext("origin", origin.URL), &transport.Config{TTFBTimeout: time.Second}, httptest.NewRequest(http.MethodHead, "http://1.2.3.7/", nil), "timeout awaiting response headers"}, } From 216690d516518daea0233d5c5e68c50db40b2256 Mon Sep 17 00:00:00 2001 From: Alex Schneider Date: Mon, 15 Mar 2021 09:53:47 +0100 Subject: [PATCH 5/5] Check request labels for 'default', too --- config/configload/load.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/configload/load.go b/config/configload/load.go index bbe857f7b..c249d050a 100644 --- a/config/configload/load.go +++ b/config/configload/load.go @@ -374,6 +374,8 @@ func refineEndpoints(definedBackends Backends, endpoints config.Endpoints) error } for _, r := range endpoint.Requests { + names[r.Name] = struct{}{} + if err := validLabelName(r.Name, &itemRange); err != nil { return err }