Skip to content

Commit

Permalink
controlplane/peer/client: Issue requests in parallel
Browse files Browse the repository at this point in the history
This commit changes the client to issue requests in parallel
for peers who have multiple gateways.

Signed-off-by: Or Ozeri <oro@il.ibm.com>
  • Loading branch information
orozery committed Apr 15, 2024
1 parent 8047fed commit bd467d8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
84 changes: 59 additions & 25 deletions pkg/controlplane/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"

"github.com/sirupsen/logrus"

Expand All @@ -44,24 +45,61 @@ type RemoteServerAuthorizationResponse struct {
AccessToken string
}

// authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
func (c *Client) getResponse(
getRespFunc func(client *jsonapi.Client) (*jsonapi.Response, error),
) (*jsonapi.Response, error) {
if len(c.clients) == 1 {
return getRespFunc(c.clients[0])
}

var serverResp *jsonapi.Response
results := make(chan struct {
*jsonapi.Response
error
})
var done bool
var lock sync.Mutex
for _, client := range c.clients {
serverResp, err = client.Post(api.RemotePeerAuthorizationPath, body)
if err == nil {
break
go func(currClient *jsonapi.Client) {
resp, err := getRespFunc(currClient)
lock.Lock()
defer lock.Unlock()
if done {
return
}
results <- struct {
*jsonapi.Response
error
}{resp, err}
}(client)
}

var retErr error
for range c.clients {
result := <-results
if result.error == nil {
lock.Lock()
done = true
lock.Unlock()
return result.Response, nil
}

c.logger.Errorf("Error authorizing using endpoint %s: %v",
client.ServerURL(), err)
retErr = errors.Join(retErr, result.error)
}

return nil, retErr
}

// Authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
}

serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Post(api.RemotePeerAuthorizationPath, body)
})

Check failure on line 102 in pkg/controlplane/peer/client.go

View workflow job for this annotation

GitHub Actions / static-checks

File is not `gofumpt`-ed (gofumpt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,23 +131,19 @@ func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthoriz

// GetHeartbeat get a heartbeat from other peers.
func (c *Client) GetHeartbeat() error {
var retErr error
for _, client := range c.clients {
serverResp, err := client.Get(api.HeartbeatPath)
if err != nil {
retErr = errors.Join(retErr, err)
continue
}

if serverResp.Status == http.StatusOK {
return nil
}
serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Get(api.HeartbeatPath)
})
if err != nil {
return err
}

retErr = errors.Join(retErr, fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body))
if serverResp.Status != http.StatusOK {
return fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body)
}

return retErr // Return an error if all client targets are unreachable
return nil
}

// NewClient returns a new Peer API client.
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/k8s/test_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand All @@ -129,7 +129,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand Down

0 comments on commit bd467d8

Please sign in to comment.