diff --git a/go.mod b/go.mod index b60aef4d6..893943358 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/benbjohnson/clock v1.3.0 github.com/edwarnicke/exechelper v1.0.2 github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 - github.com/edwarnicke/grpcfd v1.1.2 + github.com/edwarnicke/grpcfd v1.1.4 github.com/edwarnicke/serialize v1.0.7 github.com/fsnotify/fsnotify v1.5.4 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index c5c2b693b..02821c45b 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/edwarnicke/exechelper v1.0.2 h1:dD49Ui2U0FBFxxhalnKw6vLS0P0TkgnXBRvKL github.com/edwarnicke/exechelper v1.0.2/go.mod h1:/T271jtNX/ND4De6pa2aRy2+8sNtyCDB1A2pp4M+fUs= github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 h1:4/2wgileNvQB4HfJbq7u4FFLKIfc38a6P0S/51ZGgX8= github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29/go.mod h1:3m+ZfVq+z0pTLW798jmqnifMsalrVLIKmfXaMFvqSuc= -github.com/edwarnicke/grpcfd v1.1.2 h1:2b8kCABQ1+JjSKGDoHadqSW7whCeTXMqtyo6jmB5B8k= -github.com/edwarnicke/grpcfd v1.1.2/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA= +github.com/edwarnicke/grpcfd v1.1.4 h1:MuXeJTyIyWuUMYJJBIW7Cr8TUBWPXRxop3aGudhzV2I= +github.com/edwarnicke/grpcfd v1.1.4/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA= github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw= github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac= github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw= diff --git a/pkg/networkservice/common/monitor/client_filter.go b/pkg/networkservice/common/monitor/client_filter.go index 854ef3b17..272436c0a 100644 --- a/pkg/networkservice/common/monitor/client_filter.go +++ b/pkg/networkservice/common/monitor/client_filter.go @@ -1,6 +1,6 @@ // Copyright (c) 2021 Cisco and/or its affiliates. // -// Copyright (c) 2023 Cisco Systems, Inc. +// Copyright (c) 2023-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -37,6 +37,9 @@ func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsC func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { for { + if c == nil || c.MonitorConnection_MonitorConnectionsClient == nil { + return nil, errors.New("MonitorConnections cilent is nil") + } eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv() if err != nil { return nil, errors.Wrap(err, "MonitorConnections client failed to receive an event") diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index 0519ab8f4..8e108c01a 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2023 Cisco and/or its affiliates. +// Copyright (c) 2021-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,65 +20,77 @@ import ( "context" "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/pkg/errors" "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type eventLoop struct { eventLoopCtx context.Context conn *networkservice.Connection eventConsumer EventConsumer - client networkservice.MonitorConnection_MonitorConnectionsClient + cancel func() + cc grpc.ClientConnInterface } -func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) { +func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) context.CancelFunc { conn = conn.Clone() // Is another chain element asking for events? If not, no need to monitor if ec == nil { - return func() {}, nil + return func() {} } // Create new eventLoopCtx and store its eventLoopCancel eventLoopCtx, eventLoopCancel := context.WithCancel(ctx) + cev := &eventLoop{ + eventLoopCtx: eventLoopCtx, + conn: conn, + eventConsumer: ec, + cc: cc, + cancel: eventLoopCancel, + } + + // Start the eventLoop + go cev.eventLoop() + return eventLoopCancel +} +func (cev *eventLoop) eventLoop() { // Create selector to only ask for events related to our Connection selector := &networkservice.MonitorScopeSelector{ PathSegments: []*networkservice.PathSegment{ { - Id: conn.GetCurrentPathSegment().GetId(), - Name: conn.GetCurrentPathSegment().GetName(), + Id: cev.conn.GetCurrentPathSegment().GetId(), + Name: cev.conn.GetCurrentPathSegment().GetName(), }, }, } - client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector) + client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector) if err != nil { - eventLoopCancel() - return nil, errors.Wrap(err, "failed to get a MonitorConnections client") + log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error()) + cev.cancel() + return } - cev := &eventLoop{ - eventLoopCtx: eventLoopCtx, - conn: conn, - eventConsumer: ec, - client: newClientFilter(client, conn), + if client == nil { + log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: client is nil") + cev.cancel() + return } - // Start the eventLoop - go cev.eventLoop() - return eventLoopCancel, nil -} + filter := newClientFilter(client, cev.conn) -func (cev *eventLoop) eventLoop() { // So we have a client, and can receive events for { - eventIn, err := cev.client.Recv() + eventIn, err := filter.Recv() if cev.eventLoopCtx.Err() != nil { return } - if err != nil { + + connOut := cev.conn.Clone() + if err != nil && connOut != nil { // If we get an error, we've lost our connection... Send Down update - connOut := cev.conn.Clone() connOut.State = networkservice.State_DOWN eventOut := &networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_UPDATE, diff --git a/pkg/networkservice/common/monitor/server.go b/pkg/networkservice/common/monitor/server.go index af496ddda..01240a3f9 100644 --- a/pkg/networkservice/common/monitor/server.go +++ b/pkg/networkservice/common/monitor/server.go @@ -1,7 +1,7 @@ -// Copyright (c) 2020-2023 Cisco Systems, Inc. -// // Copyright (c) 2021-2023 Doc.ai and/or its affiliates. // +// Copyright (c) 2020-2024 Cisco Systems, Inc. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,11 +24,9 @@ import ( "context" "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" - "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -58,7 +56,6 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito } func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - closeCtxFunc := postpone.ContextWithValues(ctx) // Cancel any existing eventLoop cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m)) if loaded { @@ -88,13 +85,7 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net // events through from, so start an eventLoop cc, ccLoaded := clientconn.Load(ctx) if ccLoaded { - cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) - if eventLoopErr != nil { - closeCtx, closeCancel := closeCtxFunc() - defer closeCancel() - _, _ = next.Client(closeCtx).Close(closeCtx, conn) - return nil, errors.Wrap(eventLoopErr, "unable to monitor") - } + cancelEventLoop := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn) store(ctx, metadata.IsClient(m), cancelEventLoop) } diff --git a/pkg/networkservice/common/monitor/server_test.go b/pkg/networkservice/common/monitor/server_test.go index aeb91f978..abd6b69bb 100644 --- a/pkg/networkservice/common/monitor/server_test.go +++ b/pkg/networkservice/common/monitor/server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Cisco and/or its affiliates. +// Copyright (c) 2020-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -34,8 +34,10 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn" "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" @@ -209,6 +211,35 @@ func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) { require.NoError(t, err) } +func TestMonitorServer_FailedConnect(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Create a grpc connection to non existing address + cc, err := grpc.Dial("1.1.1.1:5000", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + require.NotNil(t, cc) + + // Create a server + var monitorServer networkservice.MonitorConnectionServer + server := chain.NewNetworkServiceServer( + metadata.NewServer(), + checkcontext.NewServer(t, func(t *testing.T, ctx context.Context) { + clientconn.Store(ctx, cc) + }), + monitor.NewServer(ctx, &monitorServer), + ) + + request := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{Id: "id"}, + } + + // Make a request that should be successful and immediate (because monitor server connects to 1.1.1.1:5000 in background) + conn, err := server.Request(ctx, request) + require.NoError(t, err) + require.NotNil(t, conn) +} + type metricsServer struct{} func (m *metricsServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { diff --git a/pkg/networkservice/common/updatepath/client.go b/pkg/networkservice/common/updatepath/client.go index 145c137a7..9c71cf83a 100644 --- a/pkg/networkservice/common/updatepath/client.go +++ b/pkg/networkservice/common/updatepath/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Cisco Systems, Inc. +// Copyright (c) 2020-2024 Cisco Systems, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -56,8 +56,10 @@ func (i *updatePathClient) Request(ctx context.Context, request *networkservice. return nil, err } - conn.Id = conn.Path.PathSegments[index].Id - conn.Path.Index = index + if conn.GetPath() != nil && len(conn.GetPath().GetPathSegments()) > int(index) { + conn.Id = conn.GetPath().GetPathSegments()[index].Id + conn.GetPath().Index = index + } return conn, nil }