diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index 13ee7f4e6..9603f77c6 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -1,5 +1,7 @@ // Copyright (c) 2020 Cisco Systems, Inc. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -76,7 +78,7 @@ func (f *healClient) Request(ctx context.Context, request *networkservice.Networ if err != nil { return nil, err } - err = f.startHeal(ctx, request.Clone().SetRequestConnection(conn.Clone()), opts...) + err = f.startHeal(request.Clone().SetRequestConnection(conn.Clone()), opts...) if err != nil { return nil, err } @@ -101,22 +103,9 @@ func (f *healClient) stopHeal(conn *networkservice.Connection) { } // startHeal - start a healAsNeeded using the request as the request for re-request if healing is needed. -func (f *healClient) startHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error { - id := request.GetConnection().GetId() - - ctx, cancel := context.WithCancel(ctx) - +func (f *healClient) startHeal(request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error { errCh := make(chan error, 1) - f.cancelHealMapExecutor.AsyncExec(func() { - if cancel, ok := f.cancelHealMap[id]; ok { - go cancel() // TODO - what to do with the errCh here? - } - f.cancelHealMap[id] = func() <-chan error { - cancel() - return errCh - } - }) - go f.healAsNeeded(ctx, request, errCh, opts...) + go f.healAsNeeded(request, errCh, opts...) return <-errCh } @@ -127,20 +116,12 @@ func (f *healClient) startHeal(ctx context.Context, request *networkservice.Netw // healAsNeeded will then continue to monitor the servers opinions about the state of the connection until either // expireTime has passed or stopHeal is called (as in Close) or a different pathSegment is found via monitoring // indicating that a later Request has occurred and in doing so created its own healAsNeeded and so we can stop this one -func (f *healClient) healAsNeeded(ctx context.Context, request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) { +func (f *healClient) healAsNeeded(request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) { // When we are done, close the errCh defer close(errCh) pathSegment := request.GetConnection().GetNextPathSegment() - // Monitor the pathSegment - the first time with the calls context, so we can pass back and error - // if we can't confirm via monitor the other side has the expected state - recv, err := f.initialMonitorSegment(ctx, pathSegment) - if err != nil { - errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection()) - return - } - // Make sure we have a valid expireTime to work with expireTime, err := ptypes.Timestamp(pathSegment.GetExpires()) if err != nil { @@ -162,6 +143,14 @@ func (f *healClient) healAsNeeded(ctx context.Context, request *networkservice.N } }) + // Monitor the pathSegment for the first time, so we can pass back an error + // if we can't confirm via monitor the other side has the expected state + recv, err := f.initialMonitorSegment(ctx, pathSegment) + if err != nil { + errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection()) + return + } + // Tell the caller all is well by sending them a nil err so the call can continue errCh <- nil diff --git a/pkg/networkservice/common/heal/client_test.go b/pkg/networkservice/common/heal/client_test.go index c3e42e567..c1b6e6374 100644 --- a/pkg/networkservice/common/heal/client_test.go +++ b/pkg/networkservice/common/heal/client_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2020 Doc.ai and/or its affiliates. +// Copyright (c) 2020-2021 Doc.ai and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -22,28 +22,24 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" - "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" - - "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.uber.org/goleak" "google.golang.org/grpc" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/addressof" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" ) type testOnHeal struct { @@ -60,7 +56,6 @@ func (t *testOnHeal) Close(ctx context.Context, in *networkservice.Connection, o } func TestHealClient_Request(t *testing.T) { - t.Skip("https://github.com/networkservicemesh/sdk/issues/375") defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) logrus.SetOutput(ioutil.Discard) eventCh := make(chan *networkservice.ConnectionEvent, 1) @@ -166,70 +161,3 @@ func TestHealClient_EmptyInit(t *testing.T) { }) require.Error(t, err) } - -func TestNewClient_MissingConnectionsInInit(t *testing.T) { - t.Skip("https://github.com/networkservicemesh/sdk/issues/375") - defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - logrus.SetOutput(ioutil.Discard) - eventCh := make(chan *networkservice.ConnectionEvent, 1) - - requestCh := make(chan *networkservice.NetworkServiceRequest) - onHeal := &testOnHeal{ - RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) { - requestCh <- in - return &networkservice.Connection{}, nil - }, - } - - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - client := chain.NewNetworkServiceClient( - heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal))) - - conns := []*networkservice.Connection{ - {Id: "conn-1", NetworkService: "ns-1"}, - {Id: "conn-2", NetworkService: "ns-2"}, - } - - ctx, cancel := context.WithTimeout(ctx, waitForTimeout) - defer cancel() - conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{Connection: conns[0]}) - require.Nil(t, err) - require.True(t, proto.Equal(conn, conns[0])) - - conn, err = client.Request(ctx, &networkservice.NetworkServiceRequest{Connection: conns[1]}) - require.Nil(t, err) - require.True(t, proto.Equal(conn, conns[1])) - - eventCh <- &networkservice.ConnectionEvent{ - Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, - Connections: map[string]*networkservice.Connection{conns[0].GetId(): conns[0]}, - } - - // we emulate situation that server managed to handle only the first connection - // second connection should came in the UPDATE event, but we emulate server's falling down - close(eventCh) - // at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2' - - healsRemaining := map[string]int{ - conns[0].GetId(): 1, - conns[1].GetId(): 2, - } - cond := func() bool { - select { - case r := <-requestCh: - if val, ok := healsRemaining[r.GetConnection().GetId()]; ok && val != 0 { - healsRemaining[r.GetConnection().GetId()]-- - return true - } - return false - default: - return false - } - } - require.Eventually(t, cond, waitForTimeout, tickTimeout) - require.Eventually(t, cond, waitForTimeout, tickTimeout) - require.Eventually(t, cond, waitForTimeout, tickTimeout) - require.Equal(t, 0, healsRemaining[conns[0].GetId()]) - require.Equal(t, 0, healsRemaining[conns[1].GetId()]) -} diff --git a/pkg/networkservice/common/heal/constants_test.go b/pkg/networkservice/common/heal/constants_test.go index 5cfbefa0e..c994db9dc 100644 --- a/pkg/networkservice/common/heal/constants_test.go +++ b/pkg/networkservice/common/heal/constants_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2020 Cisco and/or its affiliates. // +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,5 +27,4 @@ import ( const ( waitForTimeout = 100 * time.Millisecond waitHealTimeout = 1000 * time.Millisecond - tickTimeout = 20 * time.Millisecond )