Skip to content

Commit

Permalink
Add heal elements into client chain (#945)
Browse files Browse the repository at this point in the history
* fix panic in authorize client

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* move client factory interface into connect package

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* add standalone client function with heal support

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>

* fix review

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

* add nsmgr test

Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>

Co-authored-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
d-uzlov and glazychev-art authored Jun 9, 2021
1 parent 06aafeb commit 7b63239
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 66 deletions.
103 changes: 71 additions & 32 deletions pkg/networkservice/chains/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ package client

import (
"context"
"net/url"
"time"

"github.com/google/uuid"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/connect"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/null"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)
Expand All @@ -41,6 +45,8 @@ type clientOptions struct {
name string
additionalFunctionality []networkservice.NetworkServiceClient
authorizeClient networkservice.NetworkServiceClient
dialOptions []grpc.DialOption
dialTimeout time.Duration
}

// Option modifies default client chain values.
Expand Down Expand Up @@ -70,51 +76,84 @@ func WithAuthorizeClient(authorizeClient networkservice.NetworkServiceClient) Op
})
}

// WithDialOptions sets dial options
func WithDialOptions(dialOptions ...grpc.DialOption) Option {
return Option(func(c *clientOptions) {
c.dialOptions = dialOptions
})
}

// WithDialTimeout sets dial timeout
func WithDialTimeout(dialTimeout time.Duration) Option {
return func(c *clientOptions) {
c.dialTimeout = dialTimeout
}
}

// NewClient - returns a (1.) case NSM client.
// - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client.
// - cc - grpc.ClientConnInterface for the endpoint to which this client should connect
func NewClient(ctx context.Context, cc grpc.ClientConnInterface, clientOpts ...Option) networkservice.NetworkServiceClient {
var rv networkservice.NetworkServiceClient
func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) networkservice.NetworkServiceClient {
rv := new(networkservice.NetworkServiceClient)
var opts = &clientOptions{
name: "client-" + uuid.New().String(),
authorizeClient: null.NewClient(),
dialTimeout: 100 * time.Millisecond,
}
for _, opt := range clientOpts {
opt(opts)
}
rv = chain.NewNetworkServiceClient(
append(
append([]networkservice.NetworkServiceClient{
updatepath.NewClient(opts.name),
serialize.NewClient(),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
refresh.NewClient(ctx),
metadata.NewClient(),
}, opts.additionalFunctionality...),
opts.authorizeClient,
networkservice.NewNetworkServiceClient(cc),
)...)
return rv
}

// Factory creates a networkservice.NetworkServiceClient by passed context.Cotnext and grpc.ClientConnInterface
type Factory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient

// NewCrossConnectClientFactory - returns a (2.) case func(cc grpc.ClientConnInterface) NSM client factory.
func NewCrossConnectClientFactory(clientOpts ...Option) Factory {
return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
return chain.NewNetworkServiceClient(
mechanismtranslation.NewClient(),
NewClient(ctx, cc, clientOpts...),
)
}
*rv = chain.NewNetworkServiceClient(
updatepath.NewClient(opts.name),
serialize.NewClient(),
refresh.NewClient(ctx),
metadata.NewClient(),
adapters.NewServerToClient(
chain.NewNetworkServiceServer(
heal.NewServer(ctx, rv),
clienturl.NewServer(connectTo),
connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
return chain.NewNetworkServiceClient(
append(
opts.additionalFunctionality,
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
opts.authorizeClient,
networkservice.NewNetworkServiceClient(cc),
)...,
)
},
connect.WithDialOptions(opts.dialOptions...),
connect.WithDialTimeout(opts.dialTimeout)),
),
),
)
return *rv
}

// NewClientFactory - returns a (3.) case func(cc grpc.ClientConnInterface) NSM client factory.
func NewClientFactory(clientOpts ...Option) Factory {
func NewClientFactory(clientOpts ...Option) connect.ClientFactory {
return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
return chain.NewNetworkServiceClient(
NewClient(ctx, cc, clientOpts...),
)
var rv networkservice.NetworkServiceClient
var opts = &clientOptions{
name: "client-" + uuid.New().String(),
authorizeClient: null.NewClient(),
}
for _, opt := range clientOpts {
opt(opts)
}
rv = chain.NewNetworkServiceClient(
append(
append([]networkservice.NetworkServiceClient{
updatepath.NewClient(opts.name),
serialize.NewClient(),
refresh.NewClient(ctx),
metadata.NewClient(),
heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)),
}, opts.additionalFunctionality...),
opts.authorizeClient,
networkservice.NewNetworkServiceClient(cc),
)...)
return rv
}
}
80 changes: 80 additions & 0 deletions pkg/networkservice/chains/client/client_heal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client_test

import (
"context"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

func TestClientHeal(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}
serverCancel := startEmptyServer(ctx, t, serverURL)
defer serverCancel()

nsc := client.NewClient(ctx,
serverURL,
client.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...),
client.WithDialTimeout(time.Second),
)
_, err := nsc.Request(ctx, &networkservice.NetworkServiceRequest{})
require.NoError(t, err)

serverCancel()
require.Eventually(t, func() bool {
return grpcutils.CheckURLFree(serverURL)
}, time.Second, time.Millisecond*10)
require.NoError(t, ctx.Err())

serverCancel = startEmptyServer(ctx, t, serverURL)
defer serverCancel()

require.Eventually(t, func() bool {
_, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{})
return err == nil
}, time.Second*2, time.Millisecond*50)
}

func startEmptyServer(ctx context.Context, t *testing.T, serverURL *url.URL) context.CancelFunc {
serverCtx, serverCancel := context.WithCancel(ctx)

nse := endpoint.NewServer(serverCtx, sandbox.GenerateTestToken)

select {
case err := <-endpoint.Serve(serverCtx, serverURL, nse):
require.NoError(t, err)
default:
}

return serverCancel
}
15 changes: 15 additions & 0 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ func testNSMGRHealForwarder(t *testing.T, nodeNum int, restored bool, customConf
}
}

func TestNSMGR_HealLocalNSMgrRestored(t *testing.T) {
nsmgrCtx, nsmgrCtxCancel := context.WithCancel(context.Background())
defer nsmgrCtxCancel()

customConfig := []*sandbox.NodeConfig{
nil,
{
NsmgrCtx: nsmgrCtx,
NsmgrGenerateTokenFunc: sandbox.GenerateTestToken,
},
}

testNSMGRHealNSMgr(t, 1, customConfig, nsmgrCtxCancel)
}

func TestNSMGR_HealRemoteNSMgrRestored(t *testing.T) {
nsmgrCtx, nsmgrCtxCancel := context.WithCancel(context.Background())
defer nsmgrCtxCancel()
Expand Down
4 changes: 2 additions & 2 deletions pkg/networkservice/common/authorize/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N
}

func (a *authorizeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
p := a.serverPeer.Load().(*peer.Peer)
if p != nil {
p, ok := a.serverPeer.Load().(*peer.Peer)
if ok && p != nil {
ctx = peer.NewContext(ctx, p)
}
if err := a.policies.check(ctx, conn); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/networkservice/common/connect/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
Expand All @@ -40,7 +39,7 @@ type connectClient struct {
dialOptions []grpc.DialOption
dialErr error

clientFactory client.Factory
clientFactory ClientFactory
client networkservice.NetworkServiceClient

initOnce sync.Once
Expand Down
11 changes: 7 additions & 4 deletions pkg/networkservice/common/connect/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,24 @@ import (
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/cancelctx"
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
)

// ClientFactory is used to created new clients when new connection is created.
type ClientFactory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient

type connectServer struct {
ctx context.Context
clientFactory client.Factory
clientFactory ClientFactory
clientDialTimeout time.Duration
clientDialOptions []grpc.DialOption

Expand All @@ -63,7 +66,7 @@ type connectionInfo struct {
// clienturlctx.ClientURL(ctx)
func NewServer(
ctx context.Context,
clientFactory client.Factory,
clientFactory ClientFactory,
options ...Option,
) networkservice.NetworkServiceServer {
s := &connectServer{
Expand Down
24 changes: 13 additions & 11 deletions pkg/networkservice/common/connect/server_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (

"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/connect"
Expand Down Expand Up @@ -82,16 +83,17 @@ func TestConnect_CancelDuringRequest(t *testing.T) {
require.NoError(t, err)

var counter atomic.Int32
ptClient := newPassTroughClient(service1Name)
kernelClient := kernel.NewClient()
clientName := fmt.Sprintf("connectClient-%v", uuid.New().String())
standardClientFactory := client.NewClientFactory(
client.WithName(clientName),
client.WithAdditionalFunctionality(
mechanismtranslation.NewClient(),
newPassTroughClient(service1Name),
kernel.NewClient()),
)
clientFactory := func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient {
counter.Add(1)
return chain.NewNetworkServiceClient(
mechanismtranslation.NewClient(),
client.NewClient(ctx, cc, client.WithName(clientName),
client.WithAdditionalFunctionality(ptClient, kernelClient)),
)
return standardClientFactory(ctx, cc)
}

nseReg2 := &registry.NetworkServiceEndpoint{
Expand Down
Loading

0 comments on commit 7b63239

Please sign in to comment.