From 87247a720360d4cc2067ef0283c733b9d7a587a2 Mon Sep 17 00:00:00 2001 From: Oleg Kulyk Date: Mon, 16 Aug 2021 14:32:13 +0300 Subject: [PATCH] refactor client to return error on receiver.Receive Signed-off-by: Oleg Kulyk --- v2/client/client.go | 21 +++++++++++++-------- v2/client/client_test.go | 8 +++++++- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/v2/client/client.go b/v2/client/client.go index 0be62d7fc..548dcfd69 100644 --- a/v2/client/client.go +++ b/v2/client/client.go @@ -224,11 +224,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { }() // Start Polling. + errChan := make(chan error) wg := sync.WaitGroup{} for i := 0; i < c.pollGoroutines; i++ { - wg.Add(1) go func() { - defer wg.Done() for { var msg binding.Message var respFn protocol.ResponseFn @@ -241,13 +240,9 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { respFn = noRespFn } - if err == io.EOF { // Normal close - return - } - if err != nil { - cecontext.LoggerFrom(ctx).Warn("Error while receiving a message: ", err) - continue + errChan <- err + return } // Do not block on the invoker. @@ -270,6 +265,16 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { } } + select { + case chanErr := <-errChan: + if chanErr != io.EOF { + err = chanErr + } + case <-ctx.Done(): + // TODO: it might be important to actually return error from context + cecontext.LoggerFrom(ctx).Info("Error from closed context: ", ctx.Err()) + } + // wait for all invoker processes to finish wg.Wait() return err diff --git a/v2/client/client_test.go b/v2/client/client_test.go index e9fd21ea5..d42f5b984 100644 --- a/v2/client/client_test.go +++ b/v2/client/client_test.go @@ -16,6 +16,7 @@ import ( "net/http/httptest" "net/url" "strings" + "sync" "testing" "time" @@ -266,7 +267,7 @@ func TestClientReceive(t *testing.T) { for n, tc := range testCases { for _, path := range []string{"", "/", "/unittest/"} { t.Run(n+" at path "+path, func(t *testing.T) { - + wg := &sync.WaitGroup{} events := make(chan event.Event) p, err := cehttp.New(tc.optsFn(0, "")...) @@ -280,7 +281,9 @@ func TestClientReceive(t *testing.T) { } ctx, cancel := context.WithCancel(context.TODO()) + wg.Add(1) go func() { + defer wg.Done() err := c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error { go func() { events <- event @@ -352,6 +355,9 @@ func TestClientReceive(t *testing.T) { if _, err := http.DefaultClient.Do(req); err == nil { t.Fatalf("expected error to when sending request to stopped client") } + // need to wait until receiver goroutines finish + // in case they result in test error + wg.Wait() }) } }