Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controlplane/peer/client: Issue requests in parallel #523

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions pkg/controlplane/peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"

"github.com/sirupsen/logrus"

Expand All @@ -44,24 +45,63 @@ type RemoteServerAuthorizationResponse struct {
AccessToken string
}

// authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
// getResponse tries all gateways in parallel for a response.
// The first successful response is returned.
// If all responses failed, a joined error of all responses is returned.
func (c *Client) getResponse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a comment for the function, what it returns in case of multiple gateways (in the function description or near the code)

getRespFunc func(client *jsonapi.Client) (*jsonapi.Response, error),
) (*jsonapi.Response, error) {
if len(c.clients) == 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this case and can't do iteration on 1 client?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the common case, and the purpose is to make it more simple since you don't need go-routines and the multi-threaded syncing in this case.

return getRespFunc(c.clients[0])
}

var serverResp *jsonapi.Response
results := make(chan struct {
*jsonapi.Response
error
})
var done bool
var lock sync.Mutex
for _, client := range c.clients {
serverResp, err = client.Post(api.RemotePeerAuthorizationPath, body)
if err == nil {
break
go func(currClient *jsonapi.Client) {
resp, err := getRespFunc(currClient)
lock.Lock()
defer lock.Unlock()
if done {
return
}
results <- struct {
*jsonapi.Response
error
}{resp, err}
}(client)
}

var retErr error
for range c.clients {
result := <-results
if result.error == nil {
lock.Lock()
done = true
lock.Unlock()
return result.Response, nil
}

c.logger.Errorf("Error authorizing using endpoint %s: %v",
client.ServerURL(), err)
retErr = errors.Join(retErr, result.error)
}

return nil, retErr
}

// Authorize a request for accessing a peer exported service, yielding an access token.
func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthorizationResponse, error) {
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to serialize authorization request: %w", err)
}

serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Post(api.RemotePeerAuthorizationPath, body)
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,23 +133,19 @@ func (c *Client) Authorize(req *api.AuthorizationRequest) (*RemoteServerAuthoriz

// GetHeartbeat get a heartbeat from other peers.
func (c *Client) GetHeartbeat() error {
var retErr error
for _, client := range c.clients {
serverResp, err := client.Get(api.HeartbeatPath)
if err != nil {
retErr = errors.Join(retErr, err)
continue
}

if serverResp.Status == http.StatusOK {
return nil
}
serverResp, err := c.getResponse(func(client *jsonapi.Client) (*jsonapi.Response, error) {
return client.Get(api.HeartbeatPath)
})
if err != nil {
return err
}

retErr = errors.Join(retErr, fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body))
if serverResp.Status != http.StatusOK {
return fmt.Errorf("unable to get heartbeat (%d), server returned: %s",
serverResp.Status, serverResp.Body)
}

return retErr // Return an error if all client targets are unreachable
return nil
}

// NewClient returns a new Peer API client.
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/k8s/test_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand All @@ -129,7 +129,7 @@ func (s *TestSuite) TestPeerMultipleGateways() {
require.Equal(s.T(), cl[0].Name(), data)

// verify that the bad gateway does not effect access
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil)
require.Nil(s.T(), err)
require.Equal(s.T(), cl[0].Name(), data)
Expand Down
Loading