diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index 8c1f434e8..0da5989b9 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -21,18 +21,22 @@ package client import ( "context" + "net/url" + "time" "github.com/google/uuid" "google.golang.org/grpc" "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" "github.com/networkservicemesh/sdk/pkg/networkservice/common/serialize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" ) @@ -41,6 +45,8 @@ type clientOptions struct { name string additionalFunctionality []networkservice.NetworkServiceClient authorizeClient networkservice.NetworkServiceClient + dialOptions []grpc.DialOption + dialTimeout time.Duration } // Option modifies default client chain values. @@ -70,51 +76,84 @@ func WithAuthorizeClient(authorizeClient networkservice.NetworkServiceClient) Op }) } +// WithDialOptions sets dial options +func WithDialOptions(dialOptions ...grpc.DialOption) Option { + return Option(func(c *clientOptions) { + c.dialOptions = dialOptions + }) +} + +// WithDialTimeout sets dial timeout +func WithDialTimeout(dialTimeout time.Duration) Option { + return func(c *clientOptions) { + c.dialTimeout = dialTimeout + } +} + // NewClient - returns a (1.) case NSM client. // - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. // - cc - grpc.ClientConnInterface for the endpoint to which this client should connect -func NewClient(ctx context.Context, cc grpc.ClientConnInterface, clientOpts ...Option) networkservice.NetworkServiceClient { - var rv networkservice.NetworkServiceClient +func NewClient(ctx context.Context, connectTo *url.URL, clientOpts ...Option) networkservice.NetworkServiceClient { + rv := new(networkservice.NetworkServiceClient) var opts = &clientOptions{ name: "client-" + uuid.New().String(), authorizeClient: null.NewClient(), + dialTimeout: 100 * time.Millisecond, } for _, opt := range clientOpts { opt(opts) } - rv = chain.NewNetworkServiceClient( - append( - append([]networkservice.NetworkServiceClient{ - updatepath.NewClient(opts.name), - serialize.NewClient(), - heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), - refresh.NewClient(ctx), - metadata.NewClient(), - }, opts.additionalFunctionality...), - opts.authorizeClient, - networkservice.NewNetworkServiceClient(cc), - )...) - return rv -} - -// Factory creates a networkservice.NetworkServiceClient by passed context.Cotnext and grpc.ClientConnInterface -type Factory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient -// NewCrossConnectClientFactory - returns a (2.) case func(cc grpc.ClientConnInterface) NSM client factory. -func NewCrossConnectClientFactory(clientOpts ...Option) Factory { - return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return chain.NewNetworkServiceClient( - mechanismtranslation.NewClient(), - NewClient(ctx, cc, clientOpts...), - ) - } + *rv = chain.NewNetworkServiceClient( + updatepath.NewClient(opts.name), + serialize.NewClient(), + refresh.NewClient(ctx), + metadata.NewClient(), + adapters.NewServerToClient( + chain.NewNetworkServiceServer( + heal.NewServer(ctx, rv), + clienturl.NewServer(connectTo), + connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return chain.NewNetworkServiceClient( + append( + opts.additionalFunctionality, + heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), + opts.authorizeClient, + networkservice.NewNetworkServiceClient(cc), + )..., + ) + }, + connect.WithDialOptions(opts.dialOptions...), + connect.WithDialTimeout(opts.dialTimeout)), + ), + ), + ) + return *rv } // NewClientFactory - returns a (3.) case func(cc grpc.ClientConnInterface) NSM client factory. -func NewClientFactory(clientOpts ...Option) Factory { +func NewClientFactory(clientOpts ...Option) connect.ClientFactory { return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return chain.NewNetworkServiceClient( - NewClient(ctx, cc, clientOpts...), - ) + var rv networkservice.NetworkServiceClient + var opts = &clientOptions{ + name: "client-" + uuid.New().String(), + authorizeClient: null.NewClient(), + } + for _, opt := range clientOpts { + opt(opts) + } + rv = chain.NewNetworkServiceClient( + append( + append([]networkservice.NetworkServiceClient{ + updatepath.NewClient(opts.name), + serialize.NewClient(), + refresh.NewClient(ctx), + metadata.NewClient(), + heal.NewClient(ctx, networkservice.NewMonitorConnectionClient(cc)), + }, opts.additionalFunctionality...), + opts.authorizeClient, + networkservice.NewNetworkServiceClient(cc), + )...) + return rv } } diff --git a/pkg/networkservice/chains/client/client_heal_test.go b/pkg/networkservice/chains/client/client_heal_test.go new file mode 100644 index 000000000..246a75470 --- /dev/null +++ b/pkg/networkservice/chains/client/client_heal_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/sandbox" +) + +func TestClientHeal(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + serverURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} + serverCancel := startEmptyServer(ctx, t, serverURL) + defer serverCancel() + + nsc := client.NewClient(ctx, + serverURL, + client.WithDialOptions(sandbox.DefaultDialOptions(sandbox.GenerateTestToken)...), + client.WithDialTimeout(time.Second), + ) + _, err := nsc.Request(ctx, &networkservice.NetworkServiceRequest{}) + require.NoError(t, err) + + serverCancel() + require.Eventually(t, func() bool { + return grpcutils.CheckURLFree(serverURL) + }, time.Second, time.Millisecond*10) + require.NoError(t, ctx.Err()) + + serverCancel = startEmptyServer(ctx, t, serverURL) + defer serverCancel() + + require.Eventually(t, func() bool { + _, err = nsc.Request(ctx, &networkservice.NetworkServiceRequest{}) + return err == nil + }, time.Second*2, time.Millisecond*50) +} + +func startEmptyServer(ctx context.Context, t *testing.T, serverURL *url.URL) context.CancelFunc { + serverCtx, serverCancel := context.WithCancel(ctx) + + nse := endpoint.NewServer(serverCtx, sandbox.GenerateTestToken) + + select { + case err := <-endpoint.Serve(serverCtx, serverURL, nse): + require.NoError(t, err) + default: + } + + return serverCancel +} diff --git a/pkg/networkservice/chains/nsmgr/heal_test.go b/pkg/networkservice/chains/nsmgr/heal_test.go index 66f24dcf3..48c2aefe7 100644 --- a/pkg/networkservice/chains/nsmgr/heal_test.go +++ b/pkg/networkservice/chains/nsmgr/heal_test.go @@ -258,6 +258,21 @@ func testNSMGRHealForwarder(t *testing.T, nodeNum int, restored bool, customConf } } +func TestNSMGR_HealLocalNSMgrRestored(t *testing.T) { + nsmgrCtx, nsmgrCtxCancel := context.WithCancel(context.Background()) + defer nsmgrCtxCancel() + + customConfig := []*sandbox.NodeConfig{ + nil, + { + NsmgrCtx: nsmgrCtx, + NsmgrGenerateTokenFunc: sandbox.GenerateTestToken, + }, + } + + testNSMGRHealNSMgr(t, 1, customConfig, nsmgrCtxCancel) +} + func TestNSMGR_HealRemoteNSMgrRestored(t *testing.T) { nsmgrCtx, nsmgrCtxCancel := context.WithCancel(context.Background()) defer nsmgrCtxCancel() diff --git a/pkg/networkservice/common/authorize/client.go b/pkg/networkservice/common/authorize/client.go index b68378b05..139524d64 100644 --- a/pkg/networkservice/common/authorize/client.go +++ b/pkg/networkservice/common/authorize/client.go @@ -66,8 +66,8 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N } func (a *authorizeClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - p := a.serverPeer.Load().(*peer.Peer) - if p != nil { + p, ok := a.serverPeer.Load().(*peer.Peer) + if ok && p != nil { ctx = peer.NewContext(ctx, p) } if err := a.policies.check(ctx, conn); err != nil { diff --git a/pkg/networkservice/common/connect/client.go b/pkg/networkservice/common/connect/client.go index 789024505..b4e44b275 100644 --- a/pkg/networkservice/common/connect/client.go +++ b/pkg/networkservice/common/connect/client.go @@ -28,7 +28,6 @@ import ( "github.com/pkg/errors" "google.golang.org/grpc" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" "github.com/networkservicemesh/sdk/pkg/tools/clock" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" @@ -40,7 +39,7 @@ type connectClient struct { dialOptions []grpc.DialOption dialErr error - clientFactory client.Factory + clientFactory ClientFactory client networkservice.NetworkServiceClient initOnce sync.Once diff --git a/pkg/networkservice/common/connect/server.go b/pkg/networkservice/common/connect/server.go index fce8141c6..61b2939bd 100644 --- a/pkg/networkservice/common/connect/server.go +++ b/pkg/networkservice/common/connect/server.go @@ -25,11 +25,11 @@ import ( "time" "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/pkg/errors" "google.golang.org/grpc" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/cancelctx" "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" @@ -37,9 +37,12 @@ import ( "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" ) +// ClientFactory is used to created new clients when new connection is created. +type ClientFactory = func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient + type connectServer struct { ctx context.Context - clientFactory client.Factory + clientFactory ClientFactory clientDialTimeout time.Duration clientDialOptions []grpc.DialOption @@ -63,7 +66,7 @@ type connectionInfo struct { // clienturlctx.ClientURL(ctx) func NewServer( ctx context.Context, - clientFactory client.Factory, + clientFactory ClientFactory, options ...Option, ) networkservice.NetworkServiceServer { s := &connectServer{ diff --git a/pkg/networkservice/common/connect/server_cancel_test.go b/pkg/networkservice/common/connect/server_cancel_test.go index 56708b50d..85c1202f8 100644 --- a/pkg/networkservice/common/connect/server_cancel_test.go +++ b/pkg/networkservice/common/connect/server_cancel_test.go @@ -24,15 +24,16 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/google/uuid" - "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" - kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" - "github.com/networkservicemesh/api/pkg/api/registry" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" "google.golang.org/grpc" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" @@ -82,16 +83,17 @@ func TestConnect_CancelDuringRequest(t *testing.T) { require.NoError(t, err) var counter atomic.Int32 - ptClient := newPassTroughClient(service1Name) - kernelClient := kernel.NewClient() clientName := fmt.Sprintf("connectClient-%v", uuid.New().String()) + standardClientFactory := client.NewClientFactory( + client.WithName(clientName), + client.WithAdditionalFunctionality( + mechanismtranslation.NewClient(), + newPassTroughClient(service1Name), + kernel.NewClient()), + ) clientFactory := func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { counter.Add(1) - return chain.NewNetworkServiceClient( - mechanismtranslation.NewClient(), - client.NewClient(ctx, cc, client.WithName(clientName), - client.WithAdditionalFunctionality(ptClient, kernelClient)), - ) + return standardClientFactory(ctx, cc) } nseReg2 := ®istry.NetworkServiceEndpoint{ diff --git a/pkg/tools/sandbox/node.go b/pkg/tools/sandbox/node.go index 9e641f00b..797f03f9c 100644 --- a/pkg/tools/sandbox/node.go +++ b/pkg/tools/sandbox/node.go @@ -20,8 +20,6 @@ import ( "context" "net/url" - "google.golang.org/grpc" - "github.com/networkservicemesh/api/pkg/api/networkservice" registryapi "github.com/networkservicemesh/api/pkg/api/registry" @@ -31,9 +29,9 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" "github.com/networkservicemesh/sdk/pkg/tools/addressof" - "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -60,8 +58,11 @@ func (n *Node) NewForwarder( clienturl.NewServer(n.NSMgr.URL), heal.NewServer(ctx, addressof.NetworkServiceClient(adapters.NewServerToClient(ep))), connect.NewServer(ctx, - client.NewCrossConnectClientFactory( + client.NewClientFactory( client.WithName(nse.Name), + client.WithAdditionalFunctionality( + mechanismtranslation.NewClient(), + ), ), connect.WithDialTimeout(DialTimeout), connect.WithDialOptions(DefaultDialOptions(generatorFunc)...), @@ -136,19 +137,10 @@ func (n *Node) NewClient( additionalFunctionality ...networkservice.NetworkServiceClient, ) networkservice.NetworkServiceClient { ctx = log.Join(ctx, log.Empty()) - cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(n.NSMgr.URL), DefaultDialOptions(generatorFunc)...) - if err != nil { - log.FromContext(ctx).Fatalf("Failed to dial node NSMgr: %s", err.Error()) - } - - go func() { - defer func() { _ = cc.Close() }() - <-ctx.Done() - }() - return client.NewClient( ctx, - cc, + n.NSMgr.URL, + client.WithDialOptions(DefaultDialOptions(generatorFunc)...), client.WithAuthorizeClient(authorize.NewClient(authorize.Any())), client.WithAdditionalFunctionality(additionalFunctionality...), )