Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize heal tests #642

Merged
merged 4 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco Systems, Inc.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -76,7 +78,7 @@ func (f *healClient) Request(ctx context.Context, request *networkservice.Networ
if err != nil {
return nil, err
}
err = f.startHeal(ctx, request.Clone().SetRequestConnection(conn.Clone()), opts...)
err = f.startHeal(request.Clone().SetRequestConnection(conn.Clone()), opts...)
if err != nil {
return nil, err
}
Expand All @@ -101,22 +103,9 @@ func (f *healClient) stopHeal(conn *networkservice.Connection) {
}

// startHeal - start a healAsNeeded using the request as the request for re-request if healing is needed.
func (f *healClient) startHeal(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error {
id := request.GetConnection().GetId()

ctx, cancel := context.WithCancel(ctx)

func (f *healClient) startHeal(request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) error {
errCh := make(chan error, 1)
f.cancelHealMapExecutor.AsyncExec(func() {
if cancel, ok := f.cancelHealMap[id]; ok {
go cancel() // TODO - what to do with the errCh here?
}
f.cancelHealMap[id] = func() <-chan error {
cancel()
return errCh
}
})
go f.healAsNeeded(ctx, request, errCh, opts...)
go f.healAsNeeded(request, errCh, opts...)
return <-errCh
}

Expand All @@ -127,20 +116,12 @@ func (f *healClient) startHeal(ctx context.Context, request *networkservice.Netw
// healAsNeeded will then continue to monitor the servers opinions about the state of the connection until either
// expireTime has passed or stopHeal is called (as in Close) or a different pathSegment is found via monitoring
// indicating that a later Request has occurred and in doing so created its own healAsNeeded and so we can stop this one
func (f *healClient) healAsNeeded(ctx context.Context, request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) {
func (f *healClient) healAsNeeded(request *networkservice.NetworkServiceRequest, errCh chan error, opts ...grpc.CallOption) {
// When we are done, close the errCh
defer close(errCh)

pathSegment := request.GetConnection().GetNextPathSegment()

// Monitor the pathSegment - the first time with the calls context, so we can pass back and error
// if we can't confirm via monitor the other side has the expected state
recv, err := f.initialMonitorSegment(ctx, pathSegment)
if err != nil {
errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection())
return
}

// Make sure we have a valid expireTime to work with
expireTime, err := ptypes.Timestamp(pathSegment.GetExpires())
if err != nil {
Expand All @@ -162,6 +143,14 @@ func (f *healClient) healAsNeeded(ctx context.Context, request *networkservice.N
}
})

// Monitor the pathSegment for the first time, so we can pass back an error
// if we can't confirm via monitor the other side has the expected state
recv, err := f.initialMonitorSegment(ctx, pathSegment)
if err != nil {
errCh <- errors.Wrapf(err, "error calling MonitorConnection_MonitorConnectionsClient.Recv to get initial confirmation server has connection: %+v", request.GetConnection())
return
}

// Tell the caller all is well by sending them a nil err so the call can continue
errCh <- nil

Expand Down
90 changes: 9 additions & 81 deletions pkg/networkservice/common/heal/client_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -22,28 +22,24 @@ import (
"testing"
"time"

"google.golang.org/protobuf/proto"

"github.com/networkservicemesh/sdk/pkg/tools/sandbox"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/addressof"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

type testOnHeal struct {
Expand All @@ -60,7 +56,6 @@ func (t *testOnHeal) Close(ctx context.Context, in *networkservice.Connection, o
}

func TestHealClient_Request(t *testing.T) {
t.Skip("https://github.com/networkservicemesh/sdk/issues/375")
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
eventCh := make(chan *networkservice.ConnectionEvent, 1)
Expand Down Expand Up @@ -166,70 +161,3 @@ func TestHealClient_EmptyInit(t *testing.T) {
})
require.Error(t, err)
}

func TestNewClient_MissingConnectionsInInit(t *testing.T) {
t.Skip("https://github.com/networkservicemesh/sdk/issues/375")
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
logrus.SetOutput(ioutil.Discard)
eventCh := make(chan *networkservice.ConnectionEvent, 1)

requestCh := make(chan *networkservice.NetworkServiceRequest)
onHeal := &testOnHeal{
RequestFunc: func(ctx context.Context, in *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (connection *networkservice.Connection, e error) {
requestCh <- in
return &networkservice.Connection{}, nil
},
}

ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
client := chain.NewNetworkServiceClient(
heal.NewClient(ctx, eventchannel.NewMonitorConnectionClient(eventCh), addressof.NetworkServiceClient(onHeal)))

conns := []*networkservice.Connection{
{Id: "conn-1", NetworkService: "ns-1"},
{Id: "conn-2", NetworkService: "ns-2"},
}

ctx, cancel := context.WithTimeout(ctx, waitForTimeout)
defer cancel()
conn, err := client.Request(ctx, &networkservice.NetworkServiceRequest{Connection: conns[0]})
require.Nil(t, err)
require.True(t, proto.Equal(conn, conns[0]))

conn, err = client.Request(ctx, &networkservice.NetworkServiceRequest{Connection: conns[1]})
require.Nil(t, err)
require.True(t, proto.Equal(conn, conns[1]))

eventCh <- &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER,
Connections: map[string]*networkservice.Connection{conns[0].GetId(): conns[0]},
}

// we emulate situation that server managed to handle only the first connection
// second connection should came in the UPDATE event, but we emulate server's falling down
close(eventCh)
// at that point we expect that 'healClient' start healing both 'conn-1' and 'conn-2'

healsRemaining := map[string]int{
conns[0].GetId(): 1,
conns[1].GetId(): 2,
}
cond := func() bool {
select {
case r := <-requestCh:
if val, ok := healsRemaining[r.GetConnection().GetId()]; ok && val != 0 {
healsRemaining[r.GetConnection().GetId()]--
return true
}
return false
default:
return false
}
}
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Eventually(t, cond, waitForTimeout, tickTimeout)
require.Equal(t, 0, healsRemaining[conns[0].GetId()])
require.Equal(t, 0, healsRemaining[conns[1].GetId()])
}
3 changes: 2 additions & 1 deletion pkg/networkservice/common/heal/constants_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -25,5 +27,4 @@ import (
const (
waitForTimeout = 100 * time.Millisecond
waitHealTimeout = 1000 * time.Millisecond
tickTimeout = 20 * time.Millisecond
)