Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync issues when using standalone http receivers #541

Merged
merged 1 commit into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions v2/client/http_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"net/http"
"sync"

thttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)
Expand All @@ -25,8 +26,11 @@ type EventReceiver struct {
}

func (r *EventReceiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
r.p.ServeHTTP(rw, req)
wg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admittedly don't have the full context just saw this floating by so apologies for likely noise :)
Can't we just call this synchronously instead of in a func? Am I missing something? Perhaps worthy of a comment so it's easier to reason about, but it's probably just me :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are internal details of the SDK that require these two parts to work in parallel based on how the bindings work is documented, so yeah the server needs to go and it will block until the receive call is finished which is the second block of blocking call, but the bug was the function can't exit until ServeHTTP has finished, which is blocked on the callback...

yikes

}()

ctx := context.Background()
Expand All @@ -36,4 +40,6 @@ func (r *EventReceiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
} else if err := r.invoker.Invoke(ctx, msg, respFn); err != nil {
// TODO
}
// Block until ServeHTTP has returned
wg.Wait()
}
10 changes: 6 additions & 4 deletions v2/protocol/http/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,23 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
return // if there was no message, return.
}

done := make(chan struct{})

var finishErr error
m.OnFinish = func(err error) error {
finishErr = err
return nil
}

wg := sync.WaitGroup{}
wg.Add(1)
var fn protocol.ResponseFn = func(ctx context.Context, respMsg binding.Message, res protocol.Result, transformers ...binding.Transformer) error {
// Unblock the ServeHTTP after the reply is written
defer func() {
done <- struct{}{}
wg.Done()
}()

if finishErr != nil {
http.Error(rw, fmt.Sprintf("Cannot forward CloudEvent: %s", finishErr), http.StatusInternalServerError)
return finishErr
}

status := http.StatusOK
Expand All @@ -305,6 +306,7 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("content-type", "text/plain")
rw.WriteHeader(status)
_, _ = rw.Write([]byte(validationError.Error()))
return validationError
} else if errors.Is(res, binding.ErrUnknownEncoding) {
status = http.StatusUnsupportedMediaType
} else {
Expand All @@ -324,5 +326,5 @@ func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

p.incoming <- msgErr{msg: m, respFn: fn} // Send to Request
// Block until ResponseFn is invoked
<-done
wg.Wait()
}