From 429d553e55ff4c51863a50351c38101f20691692 Mon Sep 17 00:00:00 2001 From: Artem Glazychev Date: Mon, 25 Dec 2023 18:00:34 +0700 Subject: [PATCH] Handle INITIAL_STATE_TRANSFER only if connection state was changed (#1570) Signed-off-by: Artem Glazychev --- .../common/heal/client_filter.go | 6 +- .../common/monitor/client_filter.go | 9 +- .../common/monitor/server_test.go | 102 +++++++++++++++++- 3 files changed, 106 insertions(+), 11 deletions(-) diff --git a/pkg/networkservice/common/heal/client_filter.go b/pkg/networkservice/common/heal/client_filter.go index 73f19884e..968b8974f 100644 --- a/pkg/networkservice/common/heal/client_filter.go +++ b/pkg/networkservice/common/heal/client_filter.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022 Cisco and/or its affiliates. +// Copyright (c) 2021-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -50,10 +50,6 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { Connections: make(map[string]*networkservice.Connection), } for _, connIn := range eventIn.GetConnections() { - if eventIn.GetType() == networkservice.ConnectionEventType_DELETE { - connIn = connIn.Clone() - connIn.State = networkservice.State_DOWN - } // If we don't have enough PathSegments connIn doesn't match e.conn if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) { continue diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go index 328bb0534..854ef3b17 100644 --- a/pkg/networkservice/common/monitor/client_filter.go +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -46,10 +46,6 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { Connections: make(map[string]*networkservice.Connection), } for _, connIn := range eventIn.GetConnections() { - if eventIn.GetType() == networkservice.ConnectionEventType_DELETE { - connIn = connIn.Clone() - connIn.State = networkservice.State_DOWN - } // If we don't have enough PathSegments connIn doesn't match e.conn if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) { continue @@ -63,6 +59,11 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { continue } + if eventIn.GetType() == networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER && + connIn.GetState() == c.conn.GetState() { + continue + } + // Construct the outgoing Connection connOut := c.conn.Clone() connOut.Path = connIn.Path diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index 580ac2b46..aeb91f978 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2021 Cisco and/or its affiliates. +// Copyright (c) 2020-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,17 +18,30 @@ package monitor_test import ( "context" + "testing" "time" - "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" ) @@ -122,3 +135,88 @@ func TestMonitorServer(t *testing.T) { require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName()) } } + +func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + Build() + + // Create forwarder that adds metrics to the connection + for _, forwarder := range domain.Nodes[0].Forwarders { + forwarder.Cancel() + } + domain.Nodes[0].NewForwarder(ctx, ®istry.NetworkServiceEndpoint{ + Name: sandbox.UniqueName("forwarder-metrics"), + NetworkServiceNames: []string{"forwarder"}, + }, sandbox.GenerateTestToken, sandbox.WithForwarderAdditionalFunctionalityServer(&metricsServer{})) + + // Create NSE + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + nsReg := ®istry.NetworkService{Name: "my-service"} + _, err := nsRegistryClient.Register(ctx, nsReg) + require.NoError(t, err) + + nseReg := ®istry.NetworkServiceEndpoint{ + Name: "final-endpoint", + NetworkServiceNames: []string{nsReg.Name}, + } + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken) + + // Send Request + connID := "1" + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) + req := &networkservice.NetworkServiceRequest{ + MechanismPreferences: []*networkservice.Mechanism{ + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, + }, + Connection: &networkservice.Connection{ + Id: connID, + NetworkService: nsReg.Name, + }, + } + + requestConn, err := nsc.Request(ctx, req) + require.NoError(t, err) + require.NotNil(t, requestConn) + + // Connect to NSMgr Monitor server to get actual connections + target := grpcutils.URLToTarget(domain.Nodes[0].NSMgr.URL) + cc, err := grpc.DialContext(ctx, target, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + require.NotNil(t, cc) + go func(ctx context.Context, cc *grpc.ClientConn) { + <-ctx.Done() + _ = cc.Close() + }(ctx, cc) + c := networkservice.NewMonitorConnectionClient(cc) + mc, err := c.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{}) + require.NoError(t, err) + + // eventConn must be equal to connection requestConn + monitorEvent, _ := mc.Recv() + for _, eventConn := range monitorEvent.GetConnections() { + eventConn.Path.Index = 0 + eventConn.Id = connID + require.True(t, requestConn.Equals(eventConn)) + } + + _, err = nsc.Close(ctx, requestConn) + require.NoError(t, err) +} + +type metricsServer struct{} + +func (m *metricsServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + c, err := next.Server(ctx).Request(ctx, request) + c.GetPath().GetPathSegments()[c.GetPath().GetIndex()].Metrics = map[string]string{"metricsServer": "1"} + return c, err +} + +func (m *metricsServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { + return next.Server(ctx).Close(ctx, conn) +}