From ce7b9a802d399881905cd08e3c8829e00f39721f Mon Sep 17 00:00:00 2001 From: Denis Tingaikin Date: Tue, 4 Jun 2024 17:35:37 +0300 Subject: [PATCH 1/7] update grpcfd (#1640) Signed-off-by: Denis Tingaikin --- go.mod | 2 +- go.sum | 4 +-- .../common/monitor/eventloop.go | 16 +++++----- pkg/networkservice/common/monitor/server.go | 30 +++++++++---------- .../common/updatepath/client.go | 11 +++++-- 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index b60aef4d6..893943358 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/benbjohnson/clock v1.3.0 github.com/edwarnicke/exechelper v1.0.2 github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 - github.com/edwarnicke/grpcfd v1.1.2 + github.com/edwarnicke/grpcfd v1.1.4 github.com/edwarnicke/serialize v1.0.7 github.com/fsnotify/fsnotify v1.5.4 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index c5c2b693b..02821c45b 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/edwarnicke/exechelper v1.0.2 h1:dD49Ui2U0FBFxxhalnKw6vLS0P0TkgnXBRvKL github.com/edwarnicke/exechelper v1.0.2/go.mod h1:/T271jtNX/ND4De6pa2aRy2+8sNtyCDB1A2pp4M+fUs= github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 h1:4/2wgileNvQB4HfJbq7u4FFLKIfc38a6P0S/51ZGgX8= github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29/go.mod h1:3m+ZfVq+z0pTLW798jmqnifMsalrVLIKmfXaMFvqSuc= -github.com/edwarnicke/grpcfd v1.1.2 h1:2b8kCABQ1+JjSKGDoHadqSW7whCeTXMqtyo6jmB5B8k= -github.com/edwarnicke/grpcfd v1.1.2/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA= +github.com/edwarnicke/grpcfd v1.1.4 h1:MuXeJTyIyWuUMYJJBIW7Cr8TUBWPXRxop3aGudhzV2I= +github.com/edwarnicke/grpcfd v1.1.4/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA= github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw= github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac= github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw= diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index 0519ab8f4..63649add6 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -79,14 +79,16 @@ func (cev *eventLoop) eventLoop() { if err != nil { // If we get an error, we've lost our connection... Send Down update connOut := cev.conn.Clone() - connOut.State = networkservice.State_DOWN - eventOut := &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_UPDATE, - Connections: map[string]*networkservice.Connection{ - cev.conn.GetId(): connOut, - }, + if connOut != nil { + connOut.State = networkservice.State_DOWN + eventOut := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + cev.conn.GetId(): connOut, + }, + } + _ = cev.eventConsumer.Send(eventOut) } - _ = cev.eventConsumer.Send(eventOut) return } _ = cev.eventConsumer.Send(eventIn) diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index af496ddda..73432f6a9 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -24,11 +24,8 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -58,7 +55,7 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito } func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - closeCtxFunc := postpone.ContextWithValues(ctx) + // closeCtxFunc := postpone.ContextWithValues(ctx) // Cancel any existing eventLoop cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)) if loaded { @@ -86,17 +83,20 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net // If we have a clientconn ... we must be part of a passthrough server, and have a client to pass // events through from, so start an eventLoop - cc, ccLoaded := clientconn.Load(ctx) - if ccLoaded { - cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) - if eventLoopErr != nil { - closeCtx, closeCancel := closeCtxFunc() - defer closeCancel() - _, _ = next.Client(closeCtx).Close(closeCtx, conn) - return nil, errors.Wrap(eventLoopErr, "unable to monitor") - } - store(ctx, metadata.IsClient(m), cancelEventLoop) - } + // cc, ccLoaded := clientconn.Load(ctx) + // log.FromContext(ctx).Infof("ccLoaded") + // if ccLoaded { + // log.FromContext(ctx).Infof("newEventLoop") + // cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) + // if eventLoopErr != nil { + // closeCtx, closeCancel := closeCtxFunc() + // defer closeCancel() + // _, _ = next.Client(closeCtx).Close(closeCtx, conn) + // return nil, errors.Wrap(eventLoopErr, "unable to monitor") + // } + // log.FromContext(ctx).Infof("STORE") + // store(ctx, metadata.IsClient(m), cancelEventLoop) + // } return conn, nil } diff --git a/pkg/networkservice/common/updatepath/client.go b/pkg/networkservice/common/updatepath/client.go index 145c137a7..d97f77a26 100644 --- a/pkg/networkservice/common/updatepath/client.go +++ b/pkg/networkservice/common/updatepath/client.go @@ -56,8 +56,15 @@ func (i *updatePathClient) Request(ctx context.Context, request *networkservice. return nil, err } - conn.Id = conn.Path.PathSegments[index].Id - conn.Path.Index = index + segments := conn.GetPath().GetPathSegments() + if segments != nil && len(segments) > int(index) { + conn.Id = segments[index].Id + } + + path := conn.GetPath() + if path != nil { + path.Index = index + } return conn, nil } From 5c398dc50ba21ddc30763b3e87cc7577fb86c4e9 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Mon, 1 Jul 2024 15:42:13 +0300 Subject: [PATCH 2/7] fix monitor Signed-off-by: NikitaSkrynnik --- .../common/monitor/client_filter.go | 3 + .../common/monitor/eventloop.go | 68 +++++++++++-------- pkg/networkservice/common/monitor/server.go | 30 ++++---- .../common/updatepath/client.go | 11 +-- 4 files changed, 60 insertions(+), 52 deletions(-) diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go index 854ef3b17..85307f519 100644 --- a/pkg/networkservice/common/monitor/client_filter.go +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -37,6 +37,9 @@ func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsC func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { for { + if c == nil || c.MonitorConnection_MonitorConnectionsClient == nil { + return nil, nil + } eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv() if err != nil { return nil, errors.Wrap(err, "MonitorConnections client failed to receive an event") diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index 63649add6..b94c5e6d2 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -20,7 +20,7 @@ import ( "context" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/pkg/errors" + "github.com/networkservicemesh/sdk/pkg/tools/log" "google.golang.org/grpc" ) @@ -28,7 +28,8 @@ type eventLoop struct { eventLoopCtx context.Context conn *networkservice.Connection eventConsumer EventConsumer - client networkservice.MonitorConnection_MonitorConnectionsClient + cancel func() + cc grpc.ClientConnInterface } func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) { @@ -42,53 +43,62 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter eventLoopCtx, eventLoopCancel := context.WithCancel(ctx) // Create selector to only ask for events related to our Connection + cev := &eventLoop{ + eventLoopCtx: eventLoopCtx, + conn: conn, + eventConsumer: ec, + cc: cc, + cancel: eventLoopCancel, + } + + // Start the eventLoop + go cev.eventLoop() + return eventLoopCancel, nil +} + +func (cev *eventLoop) eventLoop() { selector := &networkservice.MonitorScopeSelector{ PathSegments: []*networkservice.PathSegment{ { - Id: conn.GetCurrentPathSegment().GetId(), - Name: conn.GetCurrentPathSegment().GetName(), + Id: cev.conn.GetCurrentPathSegment().GetId(), + Name: cev.conn.GetCurrentPathSegment().GetName(), }, }, } - client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector) + client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector) if err != nil { - eventLoopCancel() - return nil, errors.Wrap(err, "failed to get a MonitorConnections client") + log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error()) + cev.cancel() + return } - cev := &eventLoop{ - eventLoopCtx: eventLoopCtx, - conn: conn, - eventConsumer: ec, - client: newClientFilter(client, conn), + if client == nil { + log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: client is nil") + cev.cancel() + return } - // Start the eventLoop - go cev.eventLoop() - return eventLoopCancel, nil -} + filter := newClientFilter(client, cev.conn) -func (cev *eventLoop) eventLoop() { // So we have a client, and can receive events for { - eventIn, err := cev.client.Recv() + eventIn, err := filter.Recv() if cev.eventLoopCtx.Err() != nil { return } - if err != nil { + + connOut := cev.conn.Clone() + if err != nil && connOut != nil { // If we get an error, we've lost our connection... Send Down update - connOut := cev.conn.Clone() - if connOut != nil { - connOut.State = networkservice.State_DOWN - eventOut := &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_UPDATE, - Connections: map[string]*networkservice.Connection{ - cev.conn.GetId(): connOut, - }, - } - _ = cev.eventConsumer.Send(eventOut) + connOut.State = networkservice.State_DOWN + eventOut := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + cev.conn.GetId(): connOut, + }, } + _ = cev.eventConsumer.Send(eventOut) return } _ = cev.eventConsumer.Send(eventIn) diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index 73432f6a9..af496ddda 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -24,8 +24,11 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -55,7 +58,7 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito } func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - // closeCtxFunc := postpone.ContextWithValues(ctx) + closeCtxFunc := postpone.ContextWithValues(ctx) // Cancel any existing eventLoop cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)) if loaded { @@ -83,20 +86,17 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net // If we have a clientconn ... we must be part of a passthrough server, and have a client to pass // events through from, so start an eventLoop - // cc, ccLoaded := clientconn.Load(ctx) - // log.FromContext(ctx).Infof("ccLoaded") - // if ccLoaded { - // log.FromContext(ctx).Infof("newEventLoop") - // cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) - // if eventLoopErr != nil { - // closeCtx, closeCancel := closeCtxFunc() - // defer closeCancel() - // _, _ = next.Client(closeCtx).Close(closeCtx, conn) - // return nil, errors.Wrap(eventLoopErr, "unable to monitor") - // } - // log.FromContext(ctx).Infof("STORE") - // store(ctx, metadata.IsClient(m), cancelEventLoop) - // } + cc, ccLoaded := clientconn.Load(ctx) + if ccLoaded { + cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) + if eventLoopErr != nil { + closeCtx, closeCancel := closeCtxFunc() + defer closeCancel() + _, _ = next.Client(closeCtx).Close(closeCtx, conn) + return nil, errors.Wrap(eventLoopErr, "unable to monitor") + } + store(ctx, metadata.IsClient(m), cancelEventLoop) + } return conn, nil } diff --git a/pkg/networkservice/common/updatepath/client.go b/pkg/networkservice/common/updatepath/client.go index d97f77a26..024dbd387 100644 --- a/pkg/networkservice/common/updatepath/client.go +++ b/pkg/networkservice/common/updatepath/client.go @@ -56,14 +56,9 @@ func (i *updatePathClient) Request(ctx context.Context, request *networkservice. return nil, err } - segments := conn.GetPath().GetPathSegments() - if segments != nil && len(segments) > int(index) { - conn.Id = segments[index].Id - } - - path := conn.GetPath() - if path != nil { - path.Index = index + if conn.GetPath() != nil && len(conn.GetPath().GetPathSegments()) > int(index) { + conn.Id = conn.GetPath().GetPathSegments()[index].Id + conn.GetPath().Index = index } return conn, nil From cee05f2a83c44921e7edd00a5db4d05b9b468ee6 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Mon, 1 Jul 2024 17:10:36 +0300 Subject: [PATCH 3/7] fix CI Signed-off-by: NikitaSkrynnik --- .../common/monitor/client_filter.go | 2 +- pkg/networkservice/common/monitor/eventloop.go | 11 ++++++----- pkg/networkservice/common/monitor/server.go | 15 +++------------ pkg/networkservice/common/updatepath/client.go | 2 +- 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go index 85307f519..3b2f854be 100644 --- a/pkg/networkservice/common/monitor/client_filter.go +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -1,6 +1,6 @@ // Copyright (c) 2021 Cisco and/or its affiliates. // -// Copyright (c) 2023 Cisco Systems, Inc. +// Copyright (c) 2023-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index b94c5e6d2..5010b1bcd 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2023 Cisco and/or its affiliates. +// Copyright (c) 2021-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,8 +20,9 @@ import ( "context" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/sdk/pkg/tools/log" "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type eventLoop struct { @@ -32,11 +33,11 @@ type eventLoop struct { cc grpc.ClientConnInterface } -func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) { +func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) context.CancelFunc { conn = conn.Clone() // Is another chain element asking for events? If not, no need to monitor if ec == nil { - return func() {}, nil + return func() {} } // Create new eventLoopCtx and store its eventLoopCancel @@ -53,7 +54,7 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter // Start the eventLoop go cev.eventLoop() - return eventLoopCancel, nil + return eventLoopCancel } func (cev *eventLoop) eventLoop() { diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index af496ddda..01240a3f9 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -1,7 +1,7 @@ -// Copyright (c) 2020-2023 Cisco Systems, Inc. -// // Copyright (c) 2021-2023 Doc.ai and/or its affiliates. // +// Copyright (c) 2020-2024 Cisco Systems, Inc. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,11 +24,9 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -58,7 +56,6 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito } func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - closeCtxFunc := postpone.ContextWithValues(ctx) // Cancel any existing eventLoop cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)) if loaded { @@ -88,13 +85,7 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net // events through from, so start an eventLoop cc, ccLoaded := clientconn.Load(ctx) if ccLoaded { - cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) - if eventLoopErr != nil { - closeCtx, closeCancel := closeCtxFunc() - defer closeCancel() - _, _ = next.Client(closeCtx).Close(closeCtx, conn) - return nil, errors.Wrap(eventLoopErr, "unable to monitor") - } + cancelEventLoop := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) store(ctx, metadata.IsClient(m), cancelEventLoop) } diff --git a/pkg/networkservice/common/updatepath/client.go b/pkg/networkservice/common/updatepath/client.go index 024dbd387..9c71cf83a 100644 --- a/pkg/networkservice/common/updatepath/client.go +++ b/pkg/networkservice/common/updatepath/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Cisco Systems, Inc. +// Copyright (c) 2020-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // From 8a22601773a6c9bcc6f541f89c34fa3a6bd6aa02 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Thu, 4 Jul 2024 12:56:56 +0300 Subject: [PATCH 4/7] add unit tests + fix some errors Signed-off-by: NikitaSkrynnik --- .../common/monitor/client_filter.go | 2 +- .../common/monitor/eventloop.go | 6 ++-- .../common/monitor/server_test.go | 34 +++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go index 3b2f854be..272436c0a 100644 --- a/pkg/networkservice/common/monitor/client_filter.go +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -38,7 +38,7 @@ func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsC func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { for { if c == nil || c.MonitorConnection_MonitorConnectionsClient == nil { - return nil, nil + return nil, errors.New("MonitorConnections cilent is nil") } eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv() if err != nil { diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index 5010b1bcd..9e0115200 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -18,6 +18,7 @@ package monitor import ( "context" + "fmt" "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" @@ -42,8 +43,6 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter // Create new eventLoopCtx and store its eventLoopCancel eventLoopCtx, eventLoopCancel := context.WithCancel(ctx) - - // Create selector to only ask for events related to our Connection cev := &eventLoop{ eventLoopCtx: eventLoopCtx, conn: conn, @@ -58,6 +57,7 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter } func (cev *eventLoop) eventLoop() { + // Create selector to only ask for events related to our Connection selector := &networkservice.MonitorScopeSelector{ PathSegments: []*networkservice.PathSegment{ { @@ -67,7 +67,9 @@ func (cev *eventLoop) eventLoop() { }, } + fmt.Println("Trying to connect to monitor") client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector) + fmt.Println("connected (maybe)") if err != nil { log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error()) cev.cancel() diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index aeb91f978..ebad8ec37 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -34,8 +34,10 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" @@ -209,6 +211,38 @@ func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) { require.NoError(t, err) } +func TestMonitorServer_Connection(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + cc, err := grpc.Dial("1.1.1.1:5000", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + require.NotNil(t, cc) + + // Create monitorServer, monitorClient, and server. + var monitorServer networkservice.MonitorConnectionServer + server := chain.NewNetworkServiceServer( + metadata.NewServer(), + checkcontext.NewServer(t, func(t *testing.T, ctx context.Context) { + clientconn.Store(ctx, cc) + }), + monitor.NewServer(ctx, &monitorServer), + ) + + request := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{Id: "id"}, + } + + conn, err := server.Request(ctx, request) + require.NoError(t, err) + require.NotNil(t, conn) + + monitorClient := adapters.NewMonitorServerToClient(monitorServer) + client, err := monitorClient.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{}) + require.NoError(t, err) + require.NotNil(t, client) +} + type metricsServer struct{} func (m *metricsServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { From b756c074237bd9253aa361a4662a582fb081cb2f Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Fri, 5 Jul 2024 10:08:05 +0300 Subject: [PATCH 5/7] add unit tests Signed-off-by: NikitaSkrynnik --- pkg/networkservice/common/monitor/server_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index ebad8ec37..8523f5e38 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -211,15 +211,16 @@ func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) { require.NoError(t, err) } -func TestMonitorServer_Connection(t *testing.T) { +func TestMonitorServer_FailedConnect(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() + // Create a grpc connection to non existing address cc, err := grpc.Dial("1.1.1.1:5000", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) require.NotNil(t, cc) - // Create monitorServer, monitorClient, and server. + // Create a server var monitorServer networkservice.MonitorConnectionServer server := chain.NewNetworkServiceServer( metadata.NewServer(), @@ -233,14 +234,10 @@ func TestMonitorServer_Connection(t *testing.T) { Connection: &networkservice.Connection{Id: "id"}, } + // Make a request that should be successful and immediate (because monitor server connects to 1.1.1.1:5000 in background) conn, err := server.Request(ctx, request) require.NoError(t, err) require.NotNil(t, conn) - - monitorClient := adapters.NewMonitorServerToClient(monitorServer) - client, err := monitorClient.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{}) - require.NoError(t, err) - require.NotNil(t, client) } type metricsServer struct{} From 93e4e4aed5e6e37d00c42d7579b132a5cda7c0e4 Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Fri, 5 Jul 2024 10:12:04 +0300 Subject: [PATCH 6/7] fix CI Signed-off-by: NikitaSkrynnik --- pkg/networkservice/common/monitor/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index 8523f5e38..abd6b69bb 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Cisco and/or its affiliates. +// Copyright (c) 2020-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // From 9d3f6d38aa85d3cb20e424c294b11ed4acefadfd Mon Sep 17 00:00:00 2001 From: NikitaSkrynnik Date: Fri, 5 Jul 2024 10:26:59 +0300 Subject: [PATCH 7/7] cleanup Signed-off-by: NikitaSkrynnik --- pkg/networkservice/common/monitor/eventloop.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index 9e0115200..8e108c01a 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -18,7 +18,6 @@ package monitor import ( "context" - "fmt" "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" @@ -67,9 +66,7 @@ func (cev *eventLoop) eventLoop() { }, } - fmt.Println("Trying to connect to monitor") client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector) - fmt.Println("connected (maybe)") if err != nil { log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error()) cev.cancel()