Skip to content

Commit

Permalink
Fix heal after failed refresh (#1563)
Browse files Browse the repository at this point in the history
* Fix heal after failed refresh

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

* Fix monitor server

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

---------

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art authored Dec 12, 2023
1 parent 821b4bc commit 1979cc1
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 19 deletions.
4 changes: 3 additions & 1 deletion pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ func TestNSMGR_HealRegistry(t *testing.T) {
_, err = nsc.Request(ctx, request.Clone())
require.NoError(t, err)

require.Equal(t, 3, counter.Requests())
require.Eventually(t, func() bool {
return counter.Requests() >= 3
}, timeout, tick)
}

func TestNSMGR_CloseHeal(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/networkservice/chains/nsmgr/select_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ func Test_DiscoverForwarder_CloseAfterError(t *testing.T) {
_, err = nsc.Request(refreshCtx, request.Clone())
require.Error(t, err)

// check that Close call can still reach the NSE
require.Equal(t, 0, counter.Closes())
// Close will reach the endpoint from healing
require.Eventually(t, func() bool {
return counter.Closes() == 1
}, timeout, tick)
_, err = nsc.Close(ctx, conn.Clone())
require.NoError(t, err)
require.Equal(t, 1, counter.Closes())
Expand Down
106 changes: 94 additions & 12 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -22,9 +24,12 @@ import (
"net"
"net/url"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/edwarnicke/serialize"
"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
Expand All @@ -41,9 +46,14 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/ipam/point2pointipam"
countutils "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/registry"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
Expand Down Expand Up @@ -74,10 +84,10 @@ func Test_AwareNSEs(t *testing.T) {
_, ipNet, err := net.ParseCIDR("172.16.0.96/29")
require.NoError(t, err)

const count = 3
var nseRegs [count]*registryapi.NetworkServiceEndpoint
var nses [count]*sandbox.EndpointEntry
var requests [count]*networkservice.NetworkServiceRequest
const nseCount = 3
var nseRegs [nseCount]*registryapi.NetworkServiceEndpoint
var nses [nseCount]*sandbox.EndpointEntry
var requests [nseCount]*networkservice.NetworkServiceRequest

ns1 := defaultRegistryService("my-ns-1")
ns2 := defaultRegistryService("my-ns-2")
Expand All @@ -88,7 +98,7 @@ func Test_AwareNSEs(t *testing.T) {
nsurl2, err := url.Parse(fmt.Sprintf("kernel://%s?%s=%s", ns2.Name, "color", "red"))
require.NoError(t, err)

nsInfo := [count]struct {
nsInfo := [nseCount]struct {
ns *registryapi.NetworkService
labelKey string
labelValue string
Expand All @@ -110,7 +120,7 @@ func Test_AwareNSEs(t *testing.T) {
},
}

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
nseRegs[i] = &registryapi.NetworkServiceEndpoint{
Name: fmt.Sprintf("nse-%s", uuid.New().String()),
NetworkServiceNames: []string{nsInfo[i].ns.Name},
Expand Down Expand Up @@ -161,8 +171,8 @@ func Test_AwareNSEs(t *testing.T) {
},
))))

var conns [count]*networkservice.Connection
for i := 0; i < count; i++ {
var conns [nseCount]*networkservice.Connection
for i := 0; i < nseCount; i++ {
conns[i], err = nsc.Request(ctx, requests[i])
require.NoError(t, err)
require.Equal(t, conns[0].NetworkServiceEndpointName, nses[0].Name)
Expand All @@ -176,12 +186,12 @@ func Test_AwareNSEs(t *testing.T) {
require.NotEqual(t, srcIP1[0], srcIP3[0])
require.NotEqual(t, srcIP2[0], srcIP3[0])

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
_, err = nsc.Close(ctx, conns[i])
require.NoError(t, err)
}

for i := 0; i < count; i++ {
for i := 0; i < nseCount; i++ {
_, err = nses[i].Unregister(ctx, nseRegs[i])
require.NoError(t, err)
}
Expand Down Expand Up @@ -604,6 +614,78 @@ func createAuthorizedEndpoint(ctx context.Context, t *testing.T, ns string, nsmg
require.NoError(t, err)
}

func Test_RestartDuringRefresh(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
var ctx, cancel = context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
var domain = sandbox.NewBuilder(ctx, t).SetNodesCount(1).Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
_, err := nsRegistryClient.Register(ctx, defaultRegistryService("ns"))
require.NoError(t, err)

var countServer count.Server
var countClint count.Client
var m sync.Once
var clientFactory begin.EventFactory
var destroyFwd atomic.Bool
var e serialize.Executor

domain.Nodes[0].NewEndpoint(ctx, &registryapi.NetworkServiceEndpoint{
Name: "nse-1",
NetworkServiceNames: []string{"ns"},
}, sandbox.GenerateTestToken, &countServer, checkrequest.NewServer(t, func(t *testing.T, nsr *networkservice.NetworkServiceRequest) {
if destroyFwd.Load() {
e.AsyncExec(func() {
for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Cancel()
}
})
}
}))

var nsc = domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(
&countClint,
checkcontext.NewClient(t, func(t *testing.T, ctx context.Context) {
m.Do(func() {
clientFactory = begin.FromContext(ctx)
})
}),
checkresponse.NewClient(t, func(t *testing.T, nsr *networkservice.Connection) {
if destroyFwd.Load() {
e.AsyncExec(func() {
for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Restart()
}
})
}
}),
heal.NewClient(ctx),
))

_, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: uuid.NewString(),
NetworkService: "ns",
},
})
require.NoError(t, err)
<-clientFactory.Request()
require.Equal(t, 2, countServer.Requests())
require.Never(t, func() bool { return countServer.Requests() > 2 }, time.Second/2, time.Second/20)
destroyFwd.Store(true)
for i := 0; i < 15; i++ {
var cs = countServer.Requests()
destroyFwd.Store(true)
err = <-clientFactory.Request()
require.Error(t, err)
destroyFwd.Store(false)
var cc = countClint.Requests()
require.Eventually(t, func() bool { return cs < countServer.Requests() }, time.Second*2, time.Second/20)
require.Eventually(t, func() bool { return cc < countClint.Requests() }, time.Second*2, time.Second/20)
}
}

// This test checks timeout on sandbox
// We run nsmgr and NSE with networkservice authorize chain element (tokens_expired.rego)
func Test_Timeout(t *testing.T) {
Expand Down Expand Up @@ -636,7 +718,7 @@ func Test_Timeout(t *testing.T) {
nsReg, err := nsRegistryClient.Register(chainCtx, ns)
require.NoError(t, err)

counter := new(countutils.Server)
counter := new(count.Server)

createAuthorizedEndpoint(chainCtx, t, ns.Name, domain.Nodes[0].NSMgr.URL, counter)

Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/discoverforwarder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (d *discoverForwarderServer) Close(ctx context.Context, conn *networkservic

nses := registry.ReadNetworkServiceEndpointList(stream)
if len(nses) == 0 {
logger.Error("forwarder is not found: %v", forwarderName)
logger.Errorf("forwarder is not found: %v", forwarderName)
return next.Server(ctx).Close(ctx, conn)
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2020-2022 Cisco Systems, Inc.
// Copyright (c) 2020-2023 Cisco Systems, Inc.
//
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
// Copyright (c) 2021-2023 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -60,14 +60,23 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito
func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)); loaded {
cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m))
if loaded {
cancelEventLoop()
}

storeEventConsumer(ctx, metadata.IsClient(m), m.MonitorConnectionServer.(EventConsumer))
clonedConn := request.GetConnection().Clone()

conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
if loaded {
clonedConn.State = networkservice.State_DOWN
_ = m.MonitorConnectionServer.(EventConsumer).Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{clonedConn.GetId(): clonedConn},
})
}
return nil, err
}
_ = m.MonitorConnectionServer.(EventConsumer).Send(&networkservice.ConnectionEvent{
Expand Down

0 comments on commit 1979cc1

Please sign in to comment.