Skip to content

Commit

Permalink
allow choose a new forwarder if client wants reselect
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Apr 24, 2023
1 parent f2fd6d2 commit 3ec46db
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 176 deletions.
176 changes: 38 additions & 138 deletions pkg/networkservice/chains/nsmgr/select_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func Test_DiscoverForwarder_CloseAfterError(t *testing.T) {
require.Equal(t, 1, counter.Closes())
}

func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) {
func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
ctx, cancel := context.WithTimeout(context.Background(), timeout)

Expand Down Expand Up @@ -110,20 +110,24 @@ func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) {

nseReg := defaultRegistryEndpoint(nsReg.Name)

// forwarder selection is stochastic
// it's possible to get the same forwarder after close by pure luck
// so we try re-selecting it several times
const reselectCount = 10

errorIndices := []int{}
// skip half of the available forwarders
skipCount := fwdCount / 2
for i := 0; i < skipCount; i++ {
errorIndices = append(errorIndices, i)
}
// then allow 1 successful request, then 1 error
errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2) //nolint:gocritic
// then allow 1 successful request, then 3 errors
errorIndices = append(errorIndices,
errorIndices[len(errorIndices)-1]+2,
errorIndices[len(errorIndices)-1]+3,
errorIndices[len(errorIndices)-1]+4,
)
// same pattern for each re-selection attempt
for i := 0; i < reselectCount; i++ {
// allow one successful request, then two errors
errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3)
}
// then allow one successful request, then two errors
// errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3)
inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(errorIndices...))
counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter, inject)
Expand All @@ -139,27 +143,29 @@ func Test_DiscoverForwarder_KeepForwarderOnErrors(t *testing.T) {

selectedFwd := conn.GetPath().GetPathSegments()[2].Name

// check forwarder doesn't change after 1 error
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, skipCount+1, counter.UniqueRequests())
require.Equal(t, skipCount+3, counter.Requests())
require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
requestsCount := counter.Requests()
for i := 0; i < reselectCount; i++ {
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)

// check forwarder doesn't change after 3 consecutive errors
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, skipCount+1, counter.UniqueRequests())
require.Equal(t, skipCount+7, counter.Requests())
require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
// check that we select a different forwarder
selectedFwd = conn.GetPath().GetPathSegments()[2].Name
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, skipCount+1, counter.UniqueRequests())
require.Equal(t, requestsCount+3, counter.Requests())
requestsCount = counter.Requests()
if selectedFwd != conn.GetPath().GetPathSegments()[2].Name {
break
}
}
require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
}

func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) {
func Test_DiscoverForwarder_ChangeForwarderOnDeath_LostHeal(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
// on Windows systems this test takes 15+ seconds for some reason
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
ctx, cancel := context.WithTimeout(context.Background(), timeout)

defer cancel()
domain := sandbox.NewBuilder(ctx, t).
Expand Down Expand Up @@ -188,7 +194,7 @@ func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) {
nseReg := defaultRegistryEndpoint(nsReg.Name)

counter := new(count.Server)
nse := domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter)

request := defaultRequest(nsReg.Name)

Expand All @@ -206,120 +212,14 @@ func Test_DiscoverForwarder_KeepForwarderOnNSEDeath_LostHeal(t *testing.T) {

selectedFwd := conn.GetPath().GetPathSegments()[2].Name

nse.Cancel()

require.Eventually(t, func() bool { return clientCounter.Closes() == 1 }, timeout, tick)
require.Equal(t, 0, counter.Closes())
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 1, counter.Requests())

// fail a refresh on connection timeout
refreshCtx, refreshCancel := context.WithTimeout(ctx, time.Second)
defer refreshCancel()
request.Connection = conn
_, err = nsc.Request(refreshCtx, request.Clone())
require.Error(t, err)
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 1, counter.Requests())
require.Equal(t, 0, counter.Closes())

// create a new NSE
nseReg2 := defaultRegistryEndpoint(nsReg.Name)
nseReg2.Name += "-2"
counter2 := new(count.Server)
// inject 1 error to make sure that don't go the "first try forwarder in path" route
inject2 := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(0, 1))
regEntry2 := domain.Nodes[0].NewEndpoint(ctx, nseReg2, sandbox.GenerateTestToken, counter2, inject2)
domain.Nodes[0].Forwarders[selectedFwd].Cancel()

// check that forwarder doesn't change after NSE re-selction
// check different forwarder selected
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, 3, counter2.UniqueRequests())
require.Equal(t, 4, counter2.Requests())
require.Equal(t, regEntry2.Name, conn.GetPath().GetPathSegments()[3].Name)
require.Equal(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
}

func Test_DiscoverForwarder_ChangeForwarderOnClose(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
ctx, cancel := context.WithTimeout(context.Background(), timeout)

defer cancel()
domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
Build()

const fwdCount = 10
for i := 0; i < fwdCount; i++ {
domain.Nodes[0].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-" + fmt.Sprint(i)),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken)
}

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService(t.Name())
nsReg, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

// forwarder selection is stochastic
// it's possible to get the same forwarder after close by pure luck
// so we try re-selecting it several times
const reselectCount = 10

errorIndices := []int{}
// skip half of the available forwarders
skipCount := fwdCount / 2
for i := 0; i < skipCount; i++ {
errorIndices = append(errorIndices, i)
}
// same pattern for each re-selection attempt
for i := 0; i < reselectCount; i++ {
// allow one successful request, then two errors
errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3)
}
// then allow one successful request, then two errors
// errorIndices = append(errorIndices, errorIndices[len(errorIndices)-1]+2, errorIndices[len(errorIndices)-1]+3)
inject := injecterror.NewServer(injecterror.WithCloseErrorTimes(), injecterror.WithRequestErrorTimes(errorIndices...))
counter := new(count.Server)
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter, inject)

request := defaultRequest(nsReg.Name)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

conn, err := nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, skipCount+1, counter.UniqueRequests())
require.Equal(t, skipCount+1, counter.Requests())

selectedFwd := conn.GetPath().GetPathSegments()[2].Name

requestsCount := counter.Requests()
for i := 0; i < reselectCount; i++ {
_, err = nsc.Close(ctx, conn)
require.NoError(t, err)

// check that we select a different forwarder
selectedFwd = conn.GetPath().GetPathSegments()[2].Name
request.Connection = conn
conn, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)
require.Equal(t, skipCount+1, counter.UniqueRequests())
require.Equal(t, requestsCount+3, counter.Requests())
requestsCount = counter.Requests()
if selectedFwd != conn.GetPath().GetPathSegments()[2].Name {
break
}
}
require.Equal(t, 1, counter.UniqueRequests())
require.Equal(t, 2, counter.Requests())
require.Equal(t, 0, counter.Closes())
require.NotEqual(t, selectedFwd, conn.GetPath().GetPathSegments()[2].Name)
}
36 changes: 0 additions & 36 deletions pkg/networkservice/common/discoverforwarder/metadata.go

This file was deleted.

8 changes: 6 additions & 2 deletions pkg/networkservice/common/discoverforwarder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ import (
"github.com/pkg/errors"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/matchutils"
)

type selectedForworderKey struct{}

type discoverForwarderServer struct {
nseClient registry.NetworkServiceEndpointRegistryClient
nsClient registry.NetworkServiceRegistryClient
Expand Down Expand Up @@ -76,8 +79,9 @@ func (d *discoverForwarderServer) forwarderName(conn *networkservice.Connection)
func (d *discoverForwarderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
var forwarderName = d.forwarderName(request.GetConnection())
var logger = log.FromContext(ctx).WithField("discoverForwarderServer", "request")
var _, forwarderSelected = metadata.Map(ctx, false).Load(selectedForworderKey{})

if forwarderName == "" || !isForwarderSelected(ctx) {
if forwarderName == "" || !forwarderSelected || request.GetConnection().GetMechanism() == nil {
ns, err := d.discoverNetworkService(ctx, request.GetConnection().GetNetworkService(), request.GetConnection().GetPayload())
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +128,7 @@ func (d *discoverForwarderServer) Request(ctx context.Context, request *networks

resp, err := next.Server(ctx).Request(clienturlctx.WithClientURL(ctx, u), request.Clone())
if err == nil {
selectForwarder(ctx)
metadata.Map(ctx, false).Store(selectedForworderKey{}, struct{}{})
return resp, nil
}
logger.Errorf("forwarder=%v url=%v returned error=%v", candidate.Name, candidate.Url, err.Error())
Expand Down

0 comments on commit 3ec46db

Please sign in to comment.