Skip to content

Commit

Permalink
Merge pull request #523 from orozery/peer-multiple-gateways
Browse files Browse the repository at this point in the history
controlplane/peer/client: Issue requests in parallel
  • Loading branch information
orozery authored Apr 16, 2024
2 parents d591fac + 0c1d041 commit fe49f2a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 27 deletions.
86 changes: 61 additions & 25 deletions pkg/controlplane/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"

"github.com/sirupsen/logrus"

Expand All @@ -44,24 +45,63 @@ type RemoteServerAuthorizationResponse struct {
AccessToken string
}

// authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
// getResponse tries all gateways in parallel for a response.
// The first successful response is returned.
// If all responses failed, a joined error of all responses is returned.
func (c *Client) getResponse(
getRespFunc func(client *jsonapi.Client) (*jsonapi.Response, error),
) (*jsonapi.Response, error) {
if len(c.clients) == 1 {
return getRespFunc(c.clients[0])
}

var serverResp *jsonapi.Response
results := make(chan struct {
*jsonapi.Response
error
})
var done bool
var lock sync.Mutex
for _, client := range c.clients {
serverResp, err = client.Post(api.RemotePeerAuthorizationPath, body)
if err == nil {
break
go func(currClient *jsonapi.Client) {
resp, err := getRespFunc(currClient)
lock.Lock()
defer lock.Unlock()
if done {
return
}
results <- struct {
*jsonapi.Response
error
}{resp, err}
}(client)
}

var retErr error
for range c.clients {
result := <-results
if result.error == nil {
lock.Lock()
done = true
lock.Unlock()
return result.Response, nil
}

c.logger.Errorf("Error authorizing using endpoint %s: %v",
client.ServerURL(), err)
retErr = errors.Join(retErr, result.error)
}

return nil, retErr
}

// Authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
}

serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Post(api.RemotePeerAuthorizationPath, body)
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,23 +133,19 @@ func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthoriz

// GetHeartbeat get a heartbeat from other peers.
func (c *Client) GetHeartbeat() error {
var retErr error
for _, client := range c.clients {
serverResp, err := client.Get(api.HeartbeatPath)
if err != nil {
retErr = errors.Join(retErr, err)
continue
}

if serverResp.Status == http.StatusOK {
return nil
}
serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Get(api.HeartbeatPath)
})
if err != nil {
return err
}

retErr = errors.Join(retErr, fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body))
if serverResp.Status != http.StatusOK {
return fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body)
}

return retErr // Return an error if all client targets are unreachable
return nil
}

// NewClient returns a new Peer API client.
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/k8s/test_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand All @@ -129,7 +129,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand Down

0 comments on commit fe49f2a

Please sign in to comment.