Skip to content

Commit

Permalink
Handle INITIAL_STATE_TRANSFER only if connection state was changed (#…
Browse files Browse the repository at this point in the history
…1570)

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art authored Dec 25, 2023
1 parent 565471f commit 429d553
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 11 deletions.
6 changes: 1 addition & 5 deletions pkg/networkservice/common/heal/client_filter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2022 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -50,10 +50,6 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() == networkservice.ConnectionEventType_DELETE {
connIn = connIn.Clone()
connIn.State = networkservice.State_DOWN
}
// If we don't have enough PathSegments connIn doesn't match e.conn
if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) {
continue
Expand Down
9 changes: 5 additions & 4 deletions pkg/networkservice/common/monitor/client_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() == networkservice.ConnectionEventType_DELETE {
connIn = connIn.Clone()
connIn.State = networkservice.State_DOWN
}
// If we don't have enough PathSegments connIn doesn't match e.conn
if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) {
continue
Expand All @@ -63,6 +59,11 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
continue
}

if eventIn.GetType() == networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER &&
connIn.GetState() == c.conn.GetState() {
continue
}

// Construct the outgoing Connection
connOut := c.conn.Clone()
connOut.Path = connIn.Path
Expand Down
102 changes: 100 additions & 2 deletions pkg/networkservice/common/monitor/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Cisco and/or its affiliates.
// Copyright (c) 2020-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,17 +18,30 @@ package monitor_test

import (
"context"

"testing"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
)

Expand Down Expand Up @@ -122,3 +135,88 @@ func TestMonitorServer(t *testing.T) {
require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
}
}

func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
Build()

// Create forwarder that adds metrics to the connection
for _, forwarder := range domain.Nodes[0].Forwarders {
forwarder.Cancel()
}
domain.Nodes[0].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Name: sandbox.UniqueName("forwarder-metrics"),
NetworkServiceNames: []string{"forwarder"},
}, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityServer(&metricsServer{}))

// Create NSE
nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
nsReg := &registry.NetworkService{Name: "my-service"}
_, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
NetworkServiceNames: []string{nsReg.Name},
}
domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken)

// Send Request
connID := "1"
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)
req := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{Cls: cls.LOCAL, Type: kernelmech.MECHANISM},
},
Connection: &networkservice.Connection{
Id: connID,
NetworkService: nsReg.Name,
},
}

requestConn, err := nsc.Request(ctx, req)
require.NoError(t, err)
require.NotNil(t, requestConn)

// Connect to NSMgr Monitor server to get actual connections
target := grpcutils.URLToTarget(domain.Nodes[0].NSMgr.URL)
cc, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
require.NotNil(t, cc)
go func(ctx context.Context, cc *grpc.ClientConn) {
<-ctx.Done()
_ = cc.Close()
}(ctx, cc)
c := networkservice.NewMonitorConnectionClient(cc)
mc, err := c.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{})
require.NoError(t, err)

// eventConn must be equal to connection requestConn
monitorEvent, _ := mc.Recv()
for _, eventConn := range monitorEvent.GetConnections() {
eventConn.Path.Index = 0
eventConn.Id = connID
require.True(t, requestConn.Equals(eventConn))
}

_, err = nsc.Close(ctx, requestConn)
require.NoError(t, err)
}

type metricsServer struct{}

func (m *metricsServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
c, err := next.Server(ctx).Request(ctx, request)
c.GetPath().GetPathSegments()[c.GetPath().GetIndex()].Metrics = map[string]string{"metricsServer": "1"}
return c, err
}

func (m *metricsServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

0 comments on commit 429d553

Please sign in to comment.