From 0c1d04109ef16f6592f81ae65e61874522e41649 Mon Sep 17 00:00:00 2001 From: Or Ozeri Date: Mon, 15 Apr 2024 14:02:21 +0300 Subject: [PATCH] controlplane/peer/client: Issue requests in parallel This commit changes the client to issue requests in parallel for peers who have multiple gateways. Signed-off-by: Or Ozeri --- pkg/controlplane/peer/client.go | 86 +++++++++++++++++++++++---------- tests/e2e/k8s/test_peer.go | 4 +- 2 files changed, 63 insertions(+), 27 deletions(-) diff --git a/pkg/controlplane/peer/client.go b/pkg/controlplane/peer/client.go index fa66bc68..f74dddbd 100644 --- a/pkg/controlplane/peer/client.go +++ b/pkg/controlplane/peer/client.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "net/http" + "sync" "github.com/sirupsen/logrus" @@ -44,24 +45,63 @@ type RemoteServerAuthorizationResponse struct { AccessToken string } -// authorize a request for accessing a peer exported service, yielding an access token. -func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) { - body, err := json.Marshal(req) - if err != nil { - return nil, fmt.Errorf("unable to serialize authorization request: %w", err) +// getResponse tries all gateways in parallel for a response. +// The first successful response is returned. +// If all responses failed, a joined error of all responses is returned. +func (c *Client) getResponse( + getRespFunc func(client *jsonapi.Client) (*jsonapi.Response, error), +) (*jsonapi.Response, error) { + if len(c.clients) == 1 { + return getRespFunc(c.clients[0]) } - var serverResp *jsonapi.Response + results := make(chan struct { + *jsonapi.Response + error + }) + var done bool + var lock sync.Mutex for _, client := range c.clients { - serverResp, err = client.Post(api.RemotePeerAuthorizationPath, body) - if err == nil { - break + go func(currClient *jsonapi.Client) { + resp, err := getRespFunc(currClient) + lock.Lock() + defer lock.Unlock() + if done { + return + } + results <- struct { + *jsonapi.Response + error + }{resp, err} + }(client) + } + + var retErr error + for range c.clients { + result := <-results + if result.error == nil { + lock.Lock() + done = true + lock.Unlock() + return result.Response, nil } - c.logger.Errorf("Error authorizing using endpoint %s: %v", - client.ServerURL(), err) + retErr = errors.Join(retErr, result.error) + } + + return nil, retErr +} + +// Authorize a request for accessing a peer exported service, yielding an access token. +func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("unable to serialize authorization request: %w", err) } + serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) { + return client.Post(api.RemotePeerAuthorizationPath, body) + }) if err != nil { return nil, err } @@ -93,23 +133,19 @@ func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthoriz // GetHeartbeat get a heartbeat from other peers. func (c *Client) GetHeartbeat() error { - var retErr error - for _, client := range c.clients { - serverResp, err := client.Get(api.HeartbeatPath) - if err != nil { - retErr = errors.Join(retErr, err) - continue - } - - if serverResp.Status == http.StatusOK { - return nil - } + serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) { + return client.Get(api.HeartbeatPath) + }) + if err != nil { + return err + } - retErr = errors.Join(retErr, fmt.Errorf("unable to get heartbeat (%d), server returned: %s", - serverResp.Status, serverResp.Body)) + if serverResp.Status != http.StatusOK { + return fmt.Errorf("unable to get heartbeat (%d), server returned: %s", + serverResp.Status, serverResp.Body) } - return retErr // Return an error if all client targets are unreachable + return nil } // NewClient returns a new Peer API client. diff --git a/tests/e2e/k8s/test_peer.go b/tests/e2e/k8s/test_peer.go index 6dbcb214..d526ecb1 100644 --- a/tests/e2e/k8s/test_peer.go +++ b/tests/e2e/k8s/test_peer.go @@ -105,7 +105,7 @@ func (s *TestSuite) TestPeerMultipleGateways() { require.Equal(s.T(), cl[0].Name(), data) // verify that the bad gateway does not effect access - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) require.Nil(s.T(), err) require.Equal(s.T(), cl[0].Name(), data) @@ -129,7 +129,7 @@ func (s *TestSuite) TestPeerMultipleGateways() { require.Equal(s.T(), cl[0].Name(), data) // verify that the bad gateway does not effect access - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) require.Nil(s.T(), err) require.Equal(s.T(), cl[0].Name(), data)