From 2cbccfe093de02241a654ad2977edecc97c7fde4 Mon Sep 17 00:00:00 2001 From: Denis Tingajkin Date: Fri, 20 Nov 2020 22:06:49 +0700 Subject: [PATCH 1/3] fix issue 593 Signed-off-by: Denis Tingajkin --- .../chains/nsmgr/server_test.go | 96 ++++++++++++++++++- pkg/networkservice/common/discover/server.go | 23 ++++- .../common/roundrobin/server.go | 40 ++++---- 3 files changed, 131 insertions(+), 28 deletions(-) diff --git a/pkg/networkservice/chains/nsmgr/server_test.go b/pkg/networkservice/chains/nsmgr/server_test.go index 4baca9ed4..8d23063e5 100644 --- a/pkg/networkservice/chains/nsmgr/server_test.go +++ b/pkg/networkservice/chains/nsmgr/server_test.go @@ -22,10 +22,14 @@ import ( "fmt" "io/ioutil" "net/url" + "strconv" + "sync" "sync/atomic" "testing" "time" + "github.com/pkg/errors" + "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc" @@ -85,7 +89,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.NotNil(t, conn) - + require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests)) require.Equal(t, 8, len(conn.Path.PathSegments)) // Simulate refresh from client. @@ -97,7 +101,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { require.NoError(t, err) require.NotNil(t, conn) require.Equal(t, 8, len(conn.Path.PathSegments)) - + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) // Close. e, err := nsc.Close(ctx, conn) require.NoError(t, err) @@ -105,6 +109,74 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { require.Equal(t, int32(1), atomic.LoadInt32(&counter.Closes)) } +func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + logrus.SetOutput(ioutil.Discard) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + domain := sandbox.NewBuilder(t). + SetNodesCount(2). + SetRegistryProxySupplier(nil). + SetContext(ctx). + Build() + defer domain.Cleanup() + + counter := new(counterServer) + + request := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: "1", + NetworkService: "my-service-remote", + Context: &networkservice.ConnectionContext{}, + }, + } + var wg sync.WaitGroup + for i := 0; i < 3; i++ { + wg.Add(1) + go func(id int) { + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint-" + strconv.Itoa(id), + NetworkServiceNames: []string{"my-service-remote"}, + } + _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, newBusyEndpoint()) + require.NoError(t, err) + wg.Done() + }(i) + } + go func() { + wg.Wait() + time.Sleep(time.Second / 2) + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint-3", + NetworkServiceNames: []string{"my-service-remote"}, + } + _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, counter) + require.NoError(t, err) + }() + nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) + require.NoError(t, err) + + conn, err := nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests)) + require.Equal(t, 8, len(conn.Path.PathSegments)) + + // Simulate refresh from client. + + refreshRequest := request.Clone() + refreshRequest.Connection = conn.Clone() + + conn, err = nsc.Request(ctx, refreshRequest) + require.NoError(t, err) + require.NotNil(t, conn) + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) + require.Equal(t, 8, len(conn.Path.PathSegments)) +} + func TestNSMGR_RemoteUsecase(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) logrus.SetOutput(ioutil.Discard) @@ -143,7 +215,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) { conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.NotNil(t, conn) - + require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests)) require.Equal(t, 8, len(conn.Path.PathSegments)) // Simulate refresh from client. @@ -154,6 +226,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) { conn, err = nsc.Request(ctx, refreshRequest) require.NoError(t, err) require.NotNil(t, conn) + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) require.Equal(t, 8, len(conn.Path.PathSegments)) // Close. @@ -199,7 +272,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) { conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.NotNil(t, conn) - + require.Equal(t, int32(1), atomic.LoadInt32(&counter.Requests)) require.Equal(t, 5, len(conn.Path.PathSegments)) // Simulate refresh from client. @@ -211,7 +284,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) { require.NoError(t, err) require.NotNil(t, conn2) require.Equal(t, 5, len(conn2.Path.PathSegments)) - + require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) // Close. e, err := nsc.Close(ctx, conn) require.NoError(t, err) @@ -402,3 +475,16 @@ func (c *counterServer) Close(ctx context.Context, connection *networkservice.Co atomic.AddInt32(&c.Closes, 1) return next.Server(ctx).Close(ctx, connection) } + +type busyEndpoint struct{} + +func (c *busyEndpoint) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + return nil, errors.New("sorry, endpoint is busy, try again later") +} + +func (c *busyEndpoint) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) { + return nil, errors.New("sorry, endpoint is busy, try again later") +} +func newBusyEndpoint() networkservice.NetworkServiceServer { + return new(busyEndpoint) +} diff --git a/pkg/networkservice/common/discover/server.go b/pkg/networkservice/common/discover/server.go index 7b88ac232..aa5c61eb8 100644 --- a/pkg/networkservice/common/discover/server.go +++ b/pkg/networkservice/common/discover/server.go @@ -66,7 +66,28 @@ func (d *discoverCandidatesServer) Request(ctx context.Context, request *network if err != nil { return nil, err } - return next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request) + visit := map[string]struct{}{} + for ctx.Err() == nil { + resp, err := next.Server(ctx).Request(WithCandidates(ctx, nses, ns), request) + if err == nil { + return resp, err + } + for _, nse := range nses { + visit[nse.Name] = struct{}{} + } + nses, err = d.discoverNetworkServiceEndpoints(ctx, ns, request.GetConnection().GetLabels()) + if err != nil { + return nil, err + } + var newNses []*registry.NetworkServiceEndpoint + for _, nse := range nses { + if _, ok := visit[nse.Name]; !ok { + newNses = append(newNses, nse) + } + } + nses = newNses + } + return nil, errors.Wrap(ctx.Err(), "no match endpoints or all endpoints fail") } func (d *discoverCandidatesServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { diff --git a/pkg/networkservice/common/roundrobin/server.go b/pkg/networkservice/common/roundrobin/server.go index 62204f243..d642926f0 100644 --- a/pkg/networkservice/common/roundrobin/server.go +++ b/pkg/networkservice/common/roundrobin/server.go @@ -44,37 +44,33 @@ func NewServer() networkservice.NetworkServiceServer { } func (s *selectEndpointServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - ctx, err := s.withClientURL(ctx, request.GetConnection()) - if err != nil { - return nil, err + if clienturlctx.ClientURL(ctx) != nil { + return next.Server(ctx).Request(ctx, request) } - return next.Server(ctx).Request(ctx, request) -} - -func (s *selectEndpointServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { - // TODO - we should remember the previous selection here. - ctx, err := s.withClientURL(ctx, conn) - if err != nil { - return nil, err - } - return next.Server(ctx).Close(ctx, conn) -} + candidates := discover.Candidates(ctx) -func (s *selectEndpointServer) withClientURL(ctx context.Context, conn *networkservice.Connection) (context.Context, error) { - if clienturlctx.ClientURL(ctx) == nil { - candidates := discover.Candidates(ctx) + for i := 0; i < len(candidates.Endpoints); i++ { endpoint := s.selector.selectEndpoint(candidates.NetworkService, candidates.Endpoints) if endpoint == nil { return nil, errors.Errorf("failed to find endpoint for Network Service: %v %v", candidates.NetworkService, candidates.Endpoints) } - conn.NetworkServiceEndpointName = endpoint.GetName() - urlString := endpoint.Url - u, err := url.Parse(urlString) + u, err := url.Parse(endpoint.Url) if err != nil { return nil, errors.WithStack(err) } ctx = clienturlctx.WithClientURL(ctx, u) - return ctx, nil + request.GetConnection().NetworkServiceEndpointName = endpoint.Name + resp, err := next.Server(ctx).Request(ctx, request) + if err == nil { + return resp, err + } + } + return nil, errors.Errorf("all candidates %#v fail", candidates) +} + +func (s *selectEndpointServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + if clienturlctx.ClientURL(ctx) != nil { + return next.Server(ctx).Close(ctx, conn) } - return ctx, nil + return nil, errors.Errorf("inccorect connection: %+v", conn) } From f6d4edd88f5ac46f217b539eb776f2106fceb2ac Mon Sep 17 00:00:00 2001 From: Denis Tingajkin Date: Fri, 20 Nov 2020 22:08:59 +0700 Subject: [PATCH 2/3] fix typo Signed-off-by: Denis Tingajkin --- pkg/networkservice/common/roundrobin/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/networkservice/common/roundrobin/server.go b/pkg/networkservice/common/roundrobin/server.go index d642926f0..c98c89bde 100644 --- a/pkg/networkservice/common/roundrobin/server.go +++ b/pkg/networkservice/common/roundrobin/server.go @@ -72,5 +72,5 @@ func (s *selectEndpointServer) Close(ctx context.Context, conn *networkservice.C if clienturlctx.ClientURL(ctx) != nil { return next.Server(ctx).Close(ctx, conn) } - return nil, errors.Errorf("inccorect connection: %+v", conn) + return nil, errors.Errorf("passed incorrect connection: %+v", conn) } From 4607552af5c16acb7c9e76ec3c2095a053f6db85 Mon Sep 17 00:00:00 2001 From: Denis Tingajkin Date: Sat, 21 Nov 2020 00:33:10 +0700 Subject: [PATCH 3/3] stabilize failing tests Signed-off-by: Denis Tingajkin --- .../common/discover/match_selector.go | 4 --- pkg/networkservice/common/discover/server.go | 26 +++++++++++++------ .../common/discover/server_test.go | 4 +-- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/networkservice/common/discover/match_selector.go b/pkg/networkservice/common/discover/match_selector.go index 0e969962d..df8ac5ed9 100644 --- a/pkg/networkservice/common/discover/match_selector.go +++ b/pkg/networkservice/common/discover/match_selector.go @@ -20,8 +20,6 @@ import ( "bytes" "text/template" - "github.com/sirupsen/logrus" - "github.com/networkservicemesh/api/pkg/api/registry" ) @@ -42,8 +40,6 @@ func isSubset(a, b, nsLabels map[string]string) bool { } func matchEndpoint(nsLabels map[string]string, ns *registry.NetworkService, networkServiceEndpoints ...*registry.NetworkServiceEndpoint) []*registry.NetworkServiceEndpoint { - logrus.Infof("Matching endpoint for labels %v", nsLabels) - // Iterate through the matches for _, match := range ns.GetMatches() { // All match source selector labels should be present in the requested labels map diff --git a/pkg/networkservice/common/discover/server.go b/pkg/networkservice/common/discover/server.go index aa5c61eb8..bc84a982e 100644 --- a/pkg/networkservice/common/discover/server.go +++ b/pkg/networkservice/common/discover/server.go @@ -130,8 +130,11 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoint(ctx context.Co select { case <-ctx.Done(): return nil, errors.Wrapf(ctx.Err(), "nse: %+v is not found", query.NetworkServiceEndpoint) - case nse := <-nseCh: - return nse, nil + case nse, ok := <-nseCh: + if ok { + return nse, nil + } + return nil, errors.New("nse stream is closed") } } func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.Context, ns *registry.NetworkService, labels map[string]string) ([]*registry.NetworkServiceEndpoint, error) { @@ -165,10 +168,14 @@ func (d *discoverCandidatesServer) discoverNetworkServiceEndpoints(ctx context.C select { case <-ctx.Done(): return nil, errors.Wrapf(ctx.Err(), "nse: %+v is not found", query.NetworkServiceEndpoint) - case nse := <-nseCh: - result := matchEndpoint(labels, ns, nse) - if len(result) != 0 { - return result, nil + case nse, ok := <-nseCh: + if ok { + result := matchEndpoint(labels, ns, nse) + if len(result) != 0 { + return result, nil + } + } else { + return nil, errors.New("nse stream is closed") } } } @@ -199,7 +206,10 @@ func (d *discoverCandidatesServer) discoverNetworkService(ctx context.Context, n select { case <-ctx.Done(): return nil, errors.Wrapf(ctx.Err(), "ns:\"%v\" with payload:\"%v\" is not found", name, payload) - case ns := <-registry.ReadNetworkServiceChannel(nsStream): - return ns, nil + case ns, ok := <-registry.ReadNetworkServiceChannel(nsStream): + if ok { + return ns, nil + } + return nil, errors.New("ns stream is closed") } } diff --git a/pkg/networkservice/common/discover/server_test.go b/pkg/networkservice/common/discover/server_test.go index 85ed18641..1b306d17a 100644 --- a/pkg/networkservice/common/discover/server_test.go +++ b/pkg/networkservice/common/discover/server_test.go @@ -338,7 +338,7 @@ func TestNoMatchServiceFound(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second/2) defer cancel() _, err := server.Request(ctx, request) - require.Equal(t, "ns:\"secure-intranet-connectivity\" with payload:\"IP\" is not found: context deadline exceeded", err.Error()) + require.Error(t, err) } func TestNoMatchServiceEndpointFound(t *testing.T) { @@ -375,5 +375,5 @@ func TestNoMatchServiceEndpointFound(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second/2) defer cancel() _, err = server.Request(ctx, request) - require.Equal(t, "nse: network_service_names:\"secure-intranet-connectivity\" is not found: context deadline exceeded", err.Error()) + require.Error(t, err) }