From d185f6a1aaaedf4960f23bd121b67d6e78566eea Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 25 Jun 2020 11:40:18 -0700 Subject: [PATCH] fix sync issues when using standalone http mode. Signed-off-by: Scott Nichols --- v2/client/http_receiver.go | 6 ++++++ v2/protocol/http/protocol.go | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/v2/client/http_receiver.go b/v2/client/http_receiver.go index 6cac07cef..4116f80ce 100644 --- a/v2/client/http_receiver.go +++ b/v2/client/http_receiver.go @@ -3,6 +3,7 @@ package client import ( "context" "net/http" + "sync" thttp "github.com/cloudevents/sdk-go/v2/protocol/http" ) @@ -25,8 +26,11 @@ type EventReceiver struct { } func (r *EventReceiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + wg := sync.WaitGroup{} + wg.Add(1) go func() { r.p.ServeHTTP(rw, req) + wg.Done() }() ctx := context.Background() @@ -36,4 +40,6 @@ func (r *EventReceiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } else if err := r.invoker.Invoke(ctx, msg, respFn); err != nil { // TODO } + // Block until ServeHTTP has returned + wg.Wait() } diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 731d69f86..97372b4b7 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -270,22 +270,23 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { return // if there was no message, return. } - done := make(chan struct{}) - var finishErr error m.OnFinish = func(err error) error { finishErr = err return nil } + wg := sync.WaitGroup{} + wg.Add(1) var fn protocol.ResponseFn = func(ctx context.Context, respMsg binding.Message, res protocol.Result, transformers ...binding.Transformer) error { // Unblock the ServeHTTP after the reply is written defer func() { - done <- struct{}{} + wg.Done() }() if finishErr != nil { http.Error(rw, fmt.Sprintf("Cannot forward CloudEvent: %s", finishErr), http.StatusInternalServerError) + return finishErr } status := http.StatusOK @@ -305,6 +306,7 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("content-type", "text/plain") rw.WriteHeader(status) _, _ = rw.Write([]byte(validationError.Error())) + return validationError } else if errors.Is(res, binding.ErrUnknownEncoding) { status = http.StatusUnsupportedMediaType } else { @@ -324,5 +326,5 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { p.incoming <- msgErr{msg: m, respFn: fn} // Send to Request // Block until ResponseFn is invoked - <-done + wg.Wait() }