Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race #160

Merged
merged 5 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions config/configload/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io/ioutil"
"path/filepath"
"regexp"
"strings"

"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
Expand Down Expand Up @@ -221,17 +220,11 @@ func mergeBackendBodies(definedBackends Backends, inline config.Inline) (hcl.Bod
// getBackendReference tries to fetch a backend from `definitions`
// block by a reference name, e.g. `backend = "name"`.
func getBackendReference(definedBackends Backends, body hcl.Body) (hcl.Body, error) {
content, _, diags := body.PartialContent(&hcl.BodySchema{
Attributes: []hcl.AttributeSchema{
{Name: backend},
}})
if diags.HasErrors() {
return nil, diags
}
content := bodyToContent(body)

// read out possible attribute reference
var name string
if attr, ok := content.Attributes["backend"]; ok {
if attr, ok := content.Attributes[backend]; ok {
val, valDiags := attr.Expr.Value(envContext)
if valDiags.HasErrors() {
return nil, valDiags
Expand Down Expand Up @@ -263,22 +256,7 @@ func getBackendReference(definedBackends Backends, body hcl.Body) (hcl.Body, err

func refineEndpoints(definedBackends Backends, endpoints config.Endpoints) error {
for _, endpoint := range endpoints {
// try to obtain proxy and request block with a chicken-and-egg situation:
// hcl labels are required if set, to make them optional we must know the content
// which could not unwrapped without label errors. We will handle this by block type
// and may have to throw an error which hints the user to configure the file properly.
endpointContent := &hcl.BodyContent{Attributes: make(hcl.Attributes)}
for _, t := range []string{proxy, request} {
c, err := contentByType(t, endpoint.Remain)
if err != nil {
return err
}
endpointContent.MissingItemRange = c.MissingItemRange
endpointContent.Blocks = append(endpointContent.Blocks, c.Blocks...)
for n, attr := range c.Attributes { // possible same key and content override, it's ok.
endpointContent.Attributes[n] = attr
}
}
endpointContent := bodyToContent(endpoint.Remain)

proxies := endpointContent.Blocks.OfType(proxy)
requests := endpointContent.Blocks.OfType(request)
Expand Down Expand Up @@ -422,6 +400,41 @@ func uniqueLabelName(unique map[string]struct{}, name string, hr *hcl.Range) err
return nil
}

func bodyToContent(body hcl.Body) *hcl.BodyContent {
content := &hcl.BodyContent{
MissingItemRange: body.MissingItemRange(),
}
b, ok := body.(*hclsyntax.Body)
if !ok {
return content
}

if len(b.Attributes) > 0 {
content.Attributes = make(hcl.Attributes)
}
for name, attr := range b.Attributes {
content.Attributes[name] = &hcl.Attribute{
Name: attr.Name,
Expr: attr.Expr,
Range: attr.Range(),
NameRange: attr.NameRange,
}
}

for _, block := range b.Blocks {
content.Blocks = append(content.Blocks, &hcl.Block{
Body: block.Body,
DefRange: block.DefRange(),
LabelRanges: block.LabelRanges,
Labels: block.Labels,
Type: block.Type,
TypeRange: block.TypeRange,
})
}

return content
}

func contentByType(blockType string, body hcl.Body) (*hcl.BodyContent, error) {
headerSchema := &hcl.BodySchema{
Blocks: []hcl.BlockHeaderSchema{
Expand Down Expand Up @@ -501,18 +514,11 @@ func newOAuthBackend(definedBackends Backends, parent hcl.Body) (hcl.Body, error
return nil, err
}

b, err := newBackend(definedBackends, &config.OAuth2{Remain: hclbody.New(&hcl.BodyContent{
return newBackend(definedBackends, &config.OAuth2{Remain: hclbody.New(&hcl.BodyContent{
Blocks: []*hcl.Block{
{Type: backend, Body: oauthBackend},
},
})})
if err != nil {
diags := err.(hcl.Diagnostics)
if strings.HasPrefix(diags[0].Summary, "The host of 'url'") {
diags[0].Summary = strings.Replace(diags[0].Summary, "The host of 'url'", "The host of 'token_endpoint'", 1)
}
}
return b, err
}

func renameAttribute(content *hcl.BodyContent, old, new string) {
Expand Down
2 changes: 1 addition & 1 deletion eval/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func SetGetBody(req *http.Request, bodyLimit int64) error {

bodyBytes := buf.Bytes()
req.GetBody = func() (io.ReadCloser, error) {
return NewReadCloser(bytes.NewBuffer(bodyBytes), req.Body), nil
return io.NopCloser(bytes.NewBuffer(bodyBytes)), nil
}
}

Expand Down
3 changes: 2 additions & 1 deletion handler/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"context"
"io"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (e *Endpoint) newResponse(req *http.Request, evalCtx *eval.Context) (*http.
}

r := strings.NewReader(seetie.ValueToString(val))
clientres.Body = eval.NewReadCloser(r, nil)
clientres.Body = io.NopCloser(r)
}

return clientres, nil
Expand Down
32 changes: 19 additions & 13 deletions handler/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,22 @@ func TestEndpoint_RoundTripContext_Variables_json_body(t *testing.T) {

defaultMethods := []string{
http.MethodGet,
//http.MethodHead,
//http.MethodPost,
//http.MethodPut,
//http.MethodPatch,
//http.MethodDelete,
//http.MethodConnect,
//http.MethodOptions,
http.MethodPost,
http.MethodPut,
http.MethodPatch,
http.MethodDelete,
http.MethodConnect,
http.MethodOptions,
}

origin := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// reflect req headers
for k, v := range r.Header {
if !strings.HasPrefix(strings.ToLower(k), "x-") {
continue
}
rw.Header()[k] = v
}
rw.WriteHeader(http.StatusNoContent)
}))
defer origin.Close()
Expand All @@ -164,7 +170,7 @@ func TestEndpoint_RoundTripContext_Variables_json_body(t *testing.T) {
origin = "` + origin.URL + `"
set_request_headers = {
x-test = req.json_body.foo
}`, []string{http.MethodTrace}, test.Header{"Content-Type": "application/json"}, `{"foo": "bar"}`, want{req: test.Header{"x-test": ""}}},
}`, []string{http.MethodTrace, http.MethodHead}, test.Header{"Content-Type": "application/json"}, `{"foo": "bar"}`, want{req: test.Header{"x-test": ""}}},
{"method /wo body", `
origin = "` + origin.URL + `"
set_request_headers = {
Expand All @@ -182,7 +188,7 @@ func TestEndpoint_RoundTripContext_Variables_json_body(t *testing.T) {
helper := test.New(subT)

backend := transport.NewBackend(
helper.NewProxyContext(tt.inlineCtx),
helper.NewInlineContext(tt.inlineCtx),
&transport.Config{NoProxyFromEnv: true}, nil, logger)

ep := handler.NewEndpoint(&handler.EndpointOptions{
Expand All @@ -208,11 +214,11 @@ func TestEndpoint_RoundTripContext_Variables_json_body(t *testing.T) {
rw := server.NewRWWrapper(rec, false, "") // crucial for working ep due to res.Write()
ep.ServeHTTP(rw, req)
rec.Flush()
//res := rec.Result()
res := rec.Result()

for k, v := range tt.want.req {
if req.Header.Get(k) != v {
subT.Errorf("want: %q for key %q, got: %q", v, k, req.Header.Get(k))
if res.Header.Get(k) != v {
subT.Errorf("want: %q for key %q, got: %q", v, k, res.Header[k])
}
}
})
Expand Down Expand Up @@ -272,7 +278,7 @@ func TestEndpoint_RoundTripContext_Null_Eval(t *testing.T) {

ep := handler.NewEndpoint(&handler.EndpointOptions{
Error: errors.DefaultJSON,
Context: helper.NewProxyContext(tc.remain),
Context: helper.NewInlineContext(tc.remain),
ReqBodyLimit: 1024,
}, logger, producer.Proxies{
&producer.Proxy{Name: "default", RoundTrip: backend},
Expand Down
3 changes: 2 additions & 1 deletion handler/producer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (pr Proxies) Produce(ctx context.Context, clientReq *http.Request, results
currentName = proxy.Name
outCtx := withRoundTripName(ctx, proxy.Name)
outCtx = context.WithValue(outCtx, request.RoundTripProxy, true)
outReq := clientReq.WithContext(outCtx)
// since proxy and backend may work on the "same" outReq this must be cloned.
outReq := clientReq.Clone(outCtx)

wg.Add(1)
go roundtrip(proxy.RoundTrip, outReq, results, wg)
Expand Down
6 changes: 3 additions & 3 deletions handler/transport/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestBackend_RoundTrip_Validation(t *testing.T) {
if err != nil {
subT.Fatal(err)
}
content := helper.NewProxyContext(`
content := helper.NewInlineContext(`
origin = "` + origin.URL + `"
`)

Expand Down Expand Up @@ -311,7 +311,7 @@ func TestBackend_director(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
hclContext := helper.NewProxyContext(tt.inlineCtx)
hclContext := helper.NewInlineContext(tt.inlineCtx)

backend := transport.NewBackend(hclContext, &transport.Config{
Timeout: time.Second,
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestProxy_BufferingOptions(t *testing.T) {

backend := transport.NewBackend(configload.MergeBodies([]hcl.Body{
test.NewRemainContext("origin", "http://"+origin.Listener.Addr().String()),
helper.NewProxyContext(tc.remain),
helper.NewInlineContext(tc.remain),
}), &transport.Config{}, &transport.BackendOptions{
OpenAPI: newOptions(),
}, nullLog)
Expand Down
6 changes: 2 additions & 4 deletions handler/transport/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"io"
"net"
"net/http"

"github.com/avenga/couper/eval"
)

var (
Expand Down Expand Up @@ -55,8 +53,8 @@ func (r *Recorder) Read(p []byte) (n int, err error) {

func (r *Recorder) Response(req *http.Request) (*http.Response, error) {
return &http.Response{
Body: eval.NewReadCloser(r.body, nil),
Header: r.Header(),
Body: io.NopCloser(r.body),
Header: r.Header().Clone(),
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
Expand Down
2 changes: 1 addition & 1 deletion internal/test/helper_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (h *Helper) NewProxy(conf *transport.Config, backendContext, proxyContext h
return proxy
}

func (h *Helper) NewProxyContext(inlineHCL string) hcl.Body {
func (h *Helper) NewInlineContext(inlineHCL string) hcl.Body {
type hclBody struct {
Inline hcl.Body `hcl:",remain"`
}
Expand Down
6 changes: 4 additions & 2 deletions server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,15 @@ func (s *HTTPServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
)
rw = w

if err := s.setGetBody(h, req); err != nil {
clientReq := req.Clone(ctx)

if err := s.setGetBody(h, clientReq); err != nil {
mux.opts.ServerOptions.ServerErrTpl.ServeError(err).ServeHTTP(rw, req)
return
}

ctx = s.evalCtx.WithClientRequest(req)
clientReq := req.Clone(ctx)
*clientReq = *clientReq.WithContext(ctx)

s.accessLog.ServeHTTP(rw, clientReq, h, startTime)

Expand Down
8 changes: 4 additions & 4 deletions server/http_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestEndpoints_ProxyReqRes(t *testing.T) {
helper.Must(err)

entries := logHook.Entries
if l := len(entries); l != 3 {
t.Fatalf("Expected 3 log entries, given %d", l)
if l := len(entries); l != 5 {
t.Fatalf("Expected 5 log entries, given %d", l)
}

if res.StatusCode != http.StatusMethodNotAllowed {
Expand All @@ -45,8 +45,8 @@ func TestEndpoints_ProxyReqRes(t *testing.T) {
helper.Must(err)
res.Body.Close()

if string(resBytes) != "808" {
t.Errorf("Expected body 808, given %s", resBytes)
if string(resBytes) != "1616" {
t.Errorf("Expected body 1616, given %s", resBytes)
}
}

Expand Down
34 changes: 31 additions & 3 deletions server/testdata/endpoints/01_couper.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,37 @@ server "api" {
proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/proxy"
backend = "proxy"
set_request_headers = {
x-inline = "test"
}
}
request "request" {

proxy "p2" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/proxy"
backend = "proxy"
set_request_headers = {
x-inline = "test"
}
}

request "r1" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/request"
backend = "request"
}

request "r2" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/request"
backend = "request"
}

set_request_headers = {
x-ep-inline = "test"
}

response {
status = beresp.status + 1
# 404 + 404
body = beresps.request.status + beresps.default.status
# 404 + 404 + 404 + 404
body = beresps.r1.status + beresps.default.status + beresps.r2.status + beresps.p2.status
}
}
}
Expand All @@ -29,9 +51,15 @@ definitions {
backend "proxy" {
path = "/override/me"
origin = env.COUPER_TEST_BACKEND_ADDR
set_request_headers = {
x-data = "proxy-test"
}
}
backend "request" {
path = "/override/me"
origin = env.COUPER_TEST_BACKEND_ADDR
set_request_headers = {
x-data = "request-test"
}
}
}