Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: common/roundrobin: should handle a case if selected endpoint returns error #598

Merged
merged 3 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 91 additions & 5 deletions pkg/networkservice/chains/nsmgr/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import (
"fmt"
"io/ioutil"
"net/url"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

Expand Down Expand Up @@ -85,7 +89,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) {
conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)

require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 8, len(conn.Path.PathSegments))

// Simulate refresh from client.
Expand All @@ -97,14 +101,82 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, 8, len(conn.Path.PathSegments))

require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
// Close.
e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
require.NotNil(t, e)
require.Equal(t, int32(1), atomic.LoadInt32(&counter.Closes))
}

func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
domain := sandbox.NewBuilder(t).
SetNodesCount(2).
SetRegistryProxySupplier(nil).
SetContext(ctx).
Build()
defer domain.Cleanup()

counter := new(counterServer)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernel.MECHANISM},
},
Connection: &networkservice.Connection{
Id: "1",
NetworkService: "my-service-remote",
Context: &networkservice.ConnectionContext{},
},
}
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint-" + strconv.Itoa(id),
NetworkServiceNames: []string{"my-service-remote"},
}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, newBusyEndpoint())
require.NoError(t, err)
wg.Done()
}(i)
}
go func() {
wg.Wait()
time.Sleep(time.Second / 2)
nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint-3",
NetworkServiceNames: []string{"my-service-remote"},
}
_, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, counter)
require.NoError(t, err)
}()
nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL)
require.NoError(t, err)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 8, len(conn.Path.PathSegments))

// Simulate refresh from client.

refreshRequest := request.Clone()
refreshRequest.Connection = conn.Clone()

conn, err = nsc.Request(ctx, refreshRequest)
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 8, len(conn.Path.PathSegments))
}

func TestNSMGR_RemoteUsecase(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
Expand Down Expand Up @@ -143,7 +215,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) {
conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)

require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 8, len(conn.Path.PathSegments))

// Simulate refresh from client.
Expand All @@ -154,6 +226,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) {
conn, err = nsc.Request(ctx, refreshRequest)
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 8, len(conn.Path.PathSegments))

// Close.
Expand Down Expand Up @@ -199,7 +272,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.NotNil(t, conn)

require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests))
require.Equal(t, 5, len(conn.Path.PathSegments))

// Simulate refresh from client.
Expand All @@ -211,7 +284,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, conn2)
require.Equal(t, 5, len(conn2.Path.PathSegments))

require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests))
// Close.
e, err := nsc.Close(ctx, conn)
require.NoError(t, err)
Expand Down Expand Up @@ -402,3 +475,16 @@ func (c *counterServer) Close(ctx context.Context, connection *networkservice.Co
atomic.AddInt32(&c.Closes, 1)
return next.Server(ctx).Close(ctx, connection)
}

type busyEndpoint struct{}

func (c *busyEndpoint) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
return nil, errors.New("sorry, endpoint is busy, try again later")
}

func (c *busyEndpoint) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
return nil, errors.New("sorry, endpoint is busy, try again later")
}
func newBusyEndpoint() networkservice.NetworkServiceServer {
return new(busyEndpoint)
}
4 changes: 0 additions & 4 deletions pkg/networkservice/common/discover/match_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"bytes"
"text/template"

"github.com/sirupsen/logrus"

"github.com/networkservicemesh/api/pkg/api/registry"
)

Expand All @@ -42,8 +40,6 @@ func isSubset(a, b, nsLabels map[string]string) bool {
}

func matchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, networkServiceEndpoints ...*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint {
logrus.Infof("Matching endpoint for labels %v", nsLabels)

// Iterate through the matches
for _, match := range ns.GetMatches() {
// All match source selector labels should be present in the requested labels map
Expand Down
49 changes: 40 additions & 9 deletions pkg/networkservice/common/discover/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,28 @@ func (d *discoverCandidatesServer) Request(ctx context.Context, request *network
if err != nil {
return nil, err
}
return next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request)
visit := map[string]struct{}{}
for ctx.Err() == nil {
resp, err := next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request)
if err == nil {
return resp, err
}
for _, nse := range nses {
visit[nse.Name] = struct{}{}
}
nses, err = d.discoverNetworkServiceEndpoints(ctx, ns, request.GetConnection().GetLabels())
if err != nil {
return nil, err
}
var newNses []*registry.NetworkServiceEndpoint
for _, nse := range nses {
if _, ok := visit[nse.Name]; !ok {
newNses = append(newNses, nse)
}
}
nses = newNses
}
return nil, errors.Wrap(ctx.Err(), "no match endpoints or all endpoints fail")
}

func (d *discoverCandidatesServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
Expand Down Expand Up @@ -109,8 +130,11 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoint(ctx context.Co
select {
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "nse: %+v is not found", query.NetworkServiceEndpoint)
case nse := <-nseCh:
return nse, nil
case nse, ok := <-nseCh:
if ok {
return nse, nil
}
return nil, errors.New("nse stream is closed")
}
}
func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.Context, ns *registry.NetworkService, labels map[string]string) ([]*registry.NetworkServiceEndpoint, error) {
Expand Down Expand Up @@ -144,10 +168,14 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.C
select {
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "nse: %+v is not found", query.NetworkServiceEndpoint)
case nse := <-nseCh:
result := matchEndpoint(labels, ns, nse)
if len(result) != 0 {
return result, nil
case nse, ok := <-nseCh:
if ok {
result := matchEndpoint(labels, ns, nse)
if len(result) != 0 {
return result, nil
}
} else {
return nil, errors.New("nse stream is closed")
}
}
}
Expand Down Expand Up @@ -178,7 +206,10 @@ func (d *discoverCandidatesServer) discoverNetworkService(ctx context.Context, n
select {
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "ns:\"%v\" with payload:\"%v\" is not found", name, payload)
case ns := <-registry.ReadNetworkServiceChannel(nsStream):
return ns, nil
case ns, ok := <-registry.ReadNetworkServiceChannel(nsStream):
if ok {
return ns, nil
}
return nil, errors.New("ns stream is closed")
}
}
4 changes: 2 additions & 2 deletions pkg/networkservice/common/discover/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestNoMatchServiceFound(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second/2)
defer cancel()
_, err := server.Request(ctx, request)
require.Equal(t, "ns:\"secure-intranet-connectivity\" with payload:\"IP\" is not found: context deadline exceeded", err.Error())
require.Error(t, err)
}

func TestNoMatchServiceEndpointFound(t *testing.T) {
Expand Down Expand Up @@ -375,5 +375,5 @@ func TestNoMatchServiceEndpointFound(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second/2)
defer cancel()
_, err = server.Request(ctx, request)
require.Equal(t, "nse: network_service_names:\"secure-intranet-connectivity\" is not found: context deadline exceeded", err.Error())
require.Error(t, err)
}
40 changes: 18 additions & 22 deletions pkg/networkservice/common/roundrobin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,33 @@ func NewServer() networkservice.NetworkServiceServer {
}

func (s *selectEndpointServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
ctx, err := s.withClientURL(ctx, request.GetConnection())
if err != nil {
return nil, err
if clienturlctx.ClientURL(ctx) != nil {
return next.Server(ctx).Request(ctx, request)
}
return next.Server(ctx).Request(ctx, request)
}

func (s *selectEndpointServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
// TODO - we should remember the previous selection here.
ctx, err := s.withClientURL(ctx, conn)
if err != nil {
return nil, err
}
return next.Server(ctx).Close(ctx, conn)
}
candidates := discover.Candidates(ctx)

func (s *selectEndpointServer) withClientURL(ctx context.Context, conn *networkservice.Connection) (context.Context, error) {
if clienturlctx.ClientURL(ctx) == nil {
candidates := discover.Candidates(ctx)
for i := 0; i < len(candidates.Endpoints); i++ {
haiodo marked this conversation as resolved.
Show resolved Hide resolved
endpoint := s.selector.selectEndpoint(candidates.NetworkService, candidates.Endpoints)
if endpoint == nil {
return nil, errors.Errorf("failed to find endpoint for Network Service: %v %v", candidates.NetworkService, candidates.Endpoints)
}
conn.NetworkServiceEndpointName = endpoint.GetName()
urlString := endpoint.Url
u, err := url.Parse(urlString)
u, err := url.Parse(endpoint.Url)
if err != nil {
return nil, errors.WithStack(err)
}
ctx = clienturlctx.WithClientURL(ctx, u)
return ctx, nil
request.GetConnection().NetworkServiceEndpointName = endpoint.Name
resp, err := next.Server(ctx).Request(ctx, request)
if err == nil {
return resp, err
}
}
return nil, errors.Errorf("all candidates %#v fail", candidates)
}

func (s *selectEndpointServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
if clienturlctx.ClientURL(ctx) != nil {
return next.Server(ctx).Close(ctx, conn)
}
return ctx, nil
return nil, errors.Errorf("passed incorrect connection: %+v", conn)
}