From 3ec46db4d489de0b9c1e874c111411fed3b437db Mon Sep 17 00:00:00 2001 From: Denis Tingaikin Date: Mon, 24 Apr 2023 17:52:43 +0300 Subject: [PATCH] allow choose a new forwarder if client wants reselect Signed-off-by: Denis Tingaikin --- .../chains/nsmgr/select_forwarder_test.go | 176 ++++-------------- .../common/discoverforwarder/metadata.go | 36 ---- .../common/discoverforwarder/server.go | 8 +- 3 files changed, 44 insertions(+), 176 deletions(-) delete mode 100644 pkg/networkservice/common/discoverforwarder/metadata.go diff --git a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go index 47c81762e..9de970e0f 100644 --- a/pkg/networkservice/chains/nsmgr/select_forwarder_test.go +++ b/pkg/networkservice/chains/nsmgr/select_forwarder_test.go @@ -80,7 +80,7 @@ func Test_DiscoverForwarder_CloseAfterError(t *testing.T) { require.Equal(t, 1, counter.Closes()) } -func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) { +func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -110,20 +110,24 @@ func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) { nseReg := defaultRegistryEndpoint(nsReg.Name) + // forwarder selection is stochastic + // it's possible to get the same forwarder after close by pure luck + // so we try re-selecting it several times + const reselectCount = 10 + errorIndices := []int{} // skip half of the available forwarders skipCount := fwdCount / 2 for i := 0; i < skipCount; i++ { errorIndices = append(errorIndices, i) } - // then allow 1 successful request, then 1 error - errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2) //nolint:gocritic - // then allow 1 successful request, then 3 errors - errorIndices = append(errorIndices, - errorIndices[len(errorIndices)-1]+2, - errorIndices[len(errorIndices)-1]+3, - errorIndices[len(errorIndices)-1]+4, - ) + // same pattern for each re-selection attempt + for i := 0; i < reselectCount; i++ { + // allow one successful request, then two errors + errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3) + } + // then allow one successful request, then two errors + // errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3) inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(errorIndices...)) counter := new(count.Server) domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter, inject) @@ -139,27 +143,29 @@ func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) { selectedFwd := conn.GetPath().GetPathSegments()[2].Name - // check forwarder doesn't change after 1 error - request.Connection = conn - conn, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, skipCount+1, counter.UniqueRequests()) - require.Equal(t, skipCount+3, counter.Requests()) - require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) + requestsCount := counter.Requests() + for i := 0; i < reselectCount; i++ { + _, err = nsc.Close(ctx, conn) + require.NoError(t, err) - // check forwarder doesn't change after 3 consecutive errors - request.Connection = conn - conn, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, skipCount+1, counter.UniqueRequests()) - require.Equal(t, skipCount+7, counter.Requests()) - require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) + // check that we select a different forwarder + selectedFwd = conn.GetPath().GetPathSegments()[2].Name + request.Connection = conn + conn, err = nsc.Request(ctx, request.Clone()) + require.NoError(t, err) + require.Equal(t, skipCount+1, counter.UniqueRequests()) + require.Equal(t, requestsCount+3, counter.Requests()) + requestsCount = counter.Requests() + if selectedFwd != conn.GetPath().GetPathSegments()[2].Name { + break + } + } + require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) } -func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) { +func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) - // on Windows systems this test takes 15+ seconds for some reason - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() domain := sandbox.NewBuilder(ctx, t). @@ -188,7 +194,7 @@ func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) { nseReg := defaultRegistryEndpoint(nsReg.Name) counter := new(count.Server) - nse := domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) request := defaultRequest(nsReg.Name) @@ -206,120 +212,14 @@ func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) { selectedFwd := conn.GetPath().GetPathSegments()[2].Name - nse.Cancel() - - require.Eventually(t, func() bool { return clientCounter.Closes() == 1 }, timeout, tick) - require.Equal(t, 0, counter.Closes()) - require.Equal(t, 1, counter.UniqueRequests()) - require.Equal(t, 1, counter.Requests()) - - // fail a refresh on connection timeout - refreshCtx, refreshCancel := context.WithTimeout(ctx, time.Second) - defer refreshCancel() - request.Connection = conn - _, err = nsc.Request(refreshCtx, request.Clone()) - require.Error(t, err) - require.Equal(t, 1, counter.UniqueRequests()) - require.Equal(t, 1, counter.Requests()) - require.Equal(t, 0, counter.Closes()) - - // create a new NSE - nseReg2 := defaultRegistryEndpoint(nsReg.Name) - nseReg2.Name += "-2" - counter2 := new(count.Server) - // inject 1 error to make sure that don't go the "first try forwarder in path" route - inject2 := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(0, 1)) - regEntry2 := domain.Nodes[0].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter2, inject2) + domain.Nodes[0].Forwarders[selectedFwd].Cancel() - // check that forwarder doesn't change after NSE re-selction + // check different forwarder selected request.Connection = conn conn, err = nsc.Request(ctx, request.Clone()) require.NoError(t, err) - require.Equal(t, 3, counter2.UniqueRequests()) - require.Equal(t, 4, counter2.Requests()) - require.Equal(t, regEntry2.Name, conn.GetPath().GetPathSegments()[3].Name) - require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) -} - -func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) { - t.Cleanup(func() { goleak.VerifyNone(t) }) - ctx, cancel := context.WithTimeout(context.Background(), timeout) - - defer cancel() - domain := sandbox.NewBuilder(ctx, t). - SetNodesCount(1). - SetNSMgrProxySupplier(nil). - SetRegistryProxySupplier(nil). - SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) { - node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer) - }). - Build() - - const fwdCount = 10 - for i := 0; i < fwdCount; i++ { - domain.Nodes[0].NewForwarder(ctx, ®istry.NetworkServiceEndpoint{ - Name: sandbox.UniqueName("forwarder-" + fmt.Sprint(i)), - NetworkServiceNames: []string{"forwarder"}, - }, sandbox.GenerateTestToken) - } - - nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) - - nsReg := defaultRegistryService(t.Name()) - nsReg, err := nsRegistryClient.Register(ctx, nsReg) - require.NoError(t, err) - - nseReg := defaultRegistryEndpoint(nsReg.Name) - - // forwarder selection is stochastic - // it's possible to get the same forwarder after close by pure luck - // so we try re-selecting it several times - const reselectCount = 10 - - errorIndices := []int{} - // skip half of the available forwarders - skipCount := fwdCount / 2 - for i := 0; i < skipCount; i++ { - errorIndices = append(errorIndices, i) - } - // same pattern for each re-selection attempt - for i := 0; i < reselectCount; i++ { - // allow one successful request, then two errors - errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3) - } - // then allow one successful request, then two errors - // errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3) - inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(errorIndices...)) - counter := new(count.Server) - domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter, inject) - - request := defaultRequest(nsReg.Name) - - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) - - conn, err := nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, skipCount+1, counter.UniqueRequests()) - require.Equal(t, skipCount+1, counter.Requests()) - - selectedFwd := conn.GetPath().GetPathSegments()[2].Name - - requestsCount := counter.Requests() - for i := 0; i < reselectCount; i++ { - _, err = nsc.Close(ctx, conn) - require.NoError(t, err) - - // check that we select a different forwarder - selectedFwd = conn.GetPath().GetPathSegments()[2].Name - request.Connection = conn - conn, err = nsc.Request(ctx, request.Clone()) - require.NoError(t, err) - require.Equal(t, skipCount+1, counter.UniqueRequests()) - require.Equal(t, requestsCount+3, counter.Requests()) - requestsCount = counter.Requests() - if selectedFwd != conn.GetPath().GetPathSegments()[2].Name { - break - } - } + require.Equal(t, 1, counter.UniqueRequests()) + require.Equal(t, 2, counter.Requests()) + require.Equal(t, 0, counter.Closes()) require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name) } diff --git a/pkg/networkservice/common/discoverforwarder/metadata.go b/pkg/networkservice/common/discoverforwarder/metadata.go deleted file mode 100644 index 7940bdeea..000000000 --- a/pkg/networkservice/common/discoverforwarder/metadata.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// Copyright (c) 2023 Cisco and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package discoverforwarder - -import ( - "context" - - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" -) - -type selectedForworderKey struct{} - -func isForwarderSelected(ctx context.Context) bool { - _, ok := metadata.Map(ctx, false).Load(selectedForworderKey{}) - return ok -} - -func selectForwarder(ctx context.Context) { - metadata.Map(ctx, false).Store(selectedForworderKey{}, struct{}{}) -} diff --git a/pkg/networkservice/common/discoverforwarder/server.go b/pkg/networkservice/common/discoverforwarder/server.go index d292b8c11..16fadbdb3 100644 --- a/pkg/networkservice/common/discoverforwarder/server.go +++ b/pkg/networkservice/common/discoverforwarder/server.go @@ -29,11 +29,14 @@ import ( "github.com/pkg/errors" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/matchutils" ) +type selectedForworderKey struct{} + type discoverForwarderServer struct { nseClient registry.NetworkServiceEndpointRegistryClient nsClient registry.NetworkServiceRegistryClient @@ -76,8 +79,9 @@ func (d *discoverForwarderServer) forwarderName(conn *networkservice.Connection) func (d *discoverForwarderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { var forwarderName = d.forwarderName(request.GetConnection()) var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request") + var _, forwarderSelected = metadata.Map(ctx, false).Load(selectedForworderKey{}) - if forwarderName == "" || !isForwarderSelected(ctx) { + if forwarderName == "" || !forwarderSelected || request.GetConnection().GetMechanism() == nil { ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload()) if err != nil { return nil, err @@ -124,7 +128,7 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone()) if err == nil { - selectForwarder(ctx) + metadata.Map(ctx, false).Store(selectedForworderKey{}, struct{}{}) return resp, nil } logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error())