diff --git a/.golangci.yml b/.golangci.yml index 362a1d63f..36c20e259 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -253,3 +253,7 @@ issues: linters: - gocyclo text: "processEvent" + - path: pkg/networkservice/common/connect/server_test.go + linters: + - funlen + text: "Function 'TestConnectServer_Request'" diff --git a/pkg/networkservice/chains/client/README.md b/pkg/networkservice/chains/client/README.md new file mode 100644 index 000000000..5f53327ba --- /dev/null +++ b/pkg/networkservice/chains/client/README.md @@ -0,0 +1,51 @@ +# Functional requirements + +There are some common chain elements that we expect to have in every client chain to make NSM working. Instead of that, +there are few different scenarios when we need to create a client chain to initiate NSM request: +1. Client to NSMgr - simple case when there is an application requesting some L2/L3 connection from the NSMgr. + * no incoming L2/L3 request - client itself is a request generator + * complete chain + ``` + Client --Request--> NSMgr + | | + |---L2/L3 connection---| + | | + ``` +2. Server to endpoint client - we already have application running as a NSM endpoint receiving request to L2/L3 +connection, but it also needs to request some other L2/L3 connection from some other endpoint. + * there is an incoming L2/L3 request - we need to generate an outgoing L2/L3 request, but the connection we return + is an incoming connection + * part of some server chain - we need to add `clientConnection` and request next elements + ``` + ... Endpoint --Request--> Endpoint + | | | + |---L2/L3 connection---|---L2/L3 connection---| + | | | + ``` +3. Proxy to endpoint client - we already have application running as a NSM server, but it doesn't provide L2/L3 +connection, it simply passes the request to some other endpoint. + * there is an incoming L2/L3 request but we simply forward it + * part of some server chain - we need to add `clientConnection` and request next elements + ``` + ... Proxy --Request--> Endpoint + | | | + |---------------L2/L3 connection--------------| + | | | + ``` + +# Implementation + +## client.NewClient(..., grpcCC, ...additionalFunctionality) + +It is a solution for the (1.) case. Client appends `additionalFunctionality` to the default client chain and passes +incoming request to the NSMgr over the `grpcCC`. + +## client.NewCrossConnectClientFactory(..., ...additionalFunctionality) + +It is a solution for the (2.) case. We create a new GRPC client on each new client URL received from the incoming request. +It can be used in `connect.NewServer` so `clientConnection` will be processed correctly. + +## client.NewClientFactory(..., ...additionalFunctionality) + +It is a solution for the (3.) case. We create a new GRPC client on each new client URL received, but process like (1.). +It can be used in `connect.NewServer` so `clientConnection` will be processed correctly. \ No newline at end of file diff --git a/pkg/networkservice/chains/client/client.go b/pkg/networkservice/chains/client/client.go index d25a3b929..cfd1a74f7 100644 --- a/pkg/networkservice/chains/client/client.go +++ b/pkg/networkservice/chains/client/client.go @@ -20,23 +20,22 @@ package client import ( "context" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer" - "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation" "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer" "github.com/networkservicemesh/sdk/pkg/tools/token" ) -// NewClient - returns a NetworkServiceMesh client as a chain of the standard Client pieces plus whatever -// additional functionality is specified +// NewClient - returns a (1.) case NSM client. // - ctx - context for the lifecycle of the *Client* itself. Cancel when discarding the client. // - name - name of the NetworkServiceMeshClient // - onHeal - *networkservice.NetworkServiceClient. Since networkservice.NetworkServiceClient is an interface @@ -71,20 +70,17 @@ func NewClient(ctx context.Context, name string, onHeal *networkservice.NetworkS return rv } -// NewClientFactory - returns a func(cc grpc.ClientConnInterface)that returns a standard Client pieces plus whatever -// additional functionality is specified -// - name - name of the NetworkServiceMeshClient -// - onHeal - *networkservice.NetworkServiceClient. Since networkservice.NetworkServiceClient is an interface -// (and thus a pointer) *networkservice.NetworkServiceClient is a double pointer. Meaning it -// points to a place that points to a place that implements networkservice.NetworkServiceClient -// This is done because when we use heal.NewClient as part of a chain, we may not *have* -// a pointer to this -// client used 'onHeal'. If we detect we need to heal, onHeal.Request is used to heal. -// If onHeal is nil, then we simply set onHeal to this client chain element -// If we are part of a larger chain or a server, we should pass the resulting chain into -// this constructor before we actually have a pointer to it. -// If onHeal nil, onHeal will be pointed to the returned networkservice.NetworkServiceClient -// - additionalFunctionality - any additional NetworkServiceClient chain elements to be included in the chain +// NewCrossConnectClientFactory - returns a (2.) case func(cc grpc.ClientConnInterface) NSM client factory. +func NewCrossConnectClientFactory(name string, onHeal *networkservice.NetworkServiceClient, tokenGenerator token.GeneratorFunc, additionalFunctionality ...networkservice.NetworkServiceClient) func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return chain.NewNetworkServiceClient( + mechanismtranslation.NewClient(), + NewClient(ctx, name, onHeal, tokenGenerator, cc, additionalFunctionality...), + ) + } +} + +// NewClientFactory - returns a (3.) case func(cc grpc.ClientConnInterface) NSM client factory. func NewClientFactory(name string, onHeal *networkservice.NetworkServiceClient, tokenGenerator token.GeneratorFunc, additionalFunctionality ...networkservice.NetworkServiceClient) func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { return func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { return NewClient(ctx, name, onHeal, tokenGenerator, cc, additionalFunctionality...) diff --git a/pkg/networkservice/chains/nsmgr/server.go b/pkg/networkservice/chains/nsmgr/server.go index bfe6a218f..0ecadcdd1 100644 --- a/pkg/networkservice/chains/nsmgr/server.go +++ b/pkg/networkservice/chains/nsmgr/server.go @@ -22,6 +22,7 @@ package nsmgr import ( "context" + "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/registry/common/querycache" "github.com/networkservicemesh/sdk/pkg/registry/core/next" @@ -43,7 +44,6 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/nextwrap" "github.com/networkservicemesh/sdk/pkg/registry/memory" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/common/discover" @@ -114,11 +114,10 @@ func NewServer(ctx context.Context, nsmRegistration *registryapi.NetworkServiceE newRecvFD(), // Receive any files passed interpose.NewServer(&interposeRegistry), filtermechanisms.NewServer(&urlsRegistryServer), - connect.NewServer( - ctx, - client.NewClientFactory(nsmRegistration.Name, - addressof.NetworkServiceClient( - adapters.NewServerToClient(rv)), + connect.NewServer(ctx, + client.NewClientFactory( + nsmRegistration.Name, + addressof.NetworkServiceClient(adapters.NewServerToClient(rv)), tokenGenerator, newSendFDClient(), // Send passed files. ), diff --git a/pkg/networkservice/chains/nsmgr/server_test.go b/pkg/networkservice/chains/nsmgr/server_test.go index 73a5aaaec..fcd7969c2 100644 --- a/pkg/networkservice/chains/nsmgr/server_test.go +++ b/pkg/networkservice/chains/nsmgr/server_test.go @@ -21,33 +21,31 @@ import ( "context" "fmt" "io/ioutil" - "net/url" "strconv" "sync" "sync/atomic" "testing" "time" - "github.com/pkg/errors" - "github.com/golang/protobuf/ptypes/empty" - "google.golang.org/grpc" - - "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - + "github.com/pkg/errors" "github.com/sirupsen/logrus" - - "go.uber.org/goleak" - "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" - "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/sandbox" + "github.com/networkservicemesh/sdk/pkg/tools/spanhelper" ) func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { @@ -66,7 +64,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -83,8 +81,7 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter) require.NoError(t, err) }() - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL) conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) @@ -102,7 +99,9 @@ func TestNSMGR_RemoteUsecase_Parallel(t *testing.T) { require.NotNil(t, conn) require.Equal(t, 8, len(conn.Path.PathSegments)) require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) + // Close. + e, err := nsc.Close(ctx, conn) require.NoError(t, err) require.NotNil(t, e) @@ -125,7 +124,7 @@ func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) { request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -156,8 +155,7 @@ func TestNSMGR_RemoteUsecase_BusyEndpoints(t *testing.T) { _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr, counter) require.NoError(t, err) }() - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) @@ -200,7 +198,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) { request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -209,8 +207,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) { }, } - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[1].NSMgr.URL) conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) @@ -230,6 +227,7 @@ func TestNSMGR_RemoteUsecase(t *testing.T) { require.Equal(t, 8, len(conn.Path.PathSegments)) // Close. + e, err := nsc.Close(ctx, conn) require.NoError(t, err) require.NotNil(t, e) @@ -252,16 +250,16 @@ func TestNSMGR_LocalUsecase(t *testing.T) { Name: "final-endpoint", NetworkServiceNames: []string{"my-service-remote"}, } + counter := &counterServer{} _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr, counter) require.NoError(t, err) - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -269,6 +267,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) { Context: &networkservice.ConnectionContext{}, }, } + conn, err := nsc.Request(ctx, request.Clone()) require.NoError(t, err) require.NotNil(t, conn) @@ -286,6 +285,7 @@ func TestNSMGR_LocalUsecase(t *testing.T) { require.Equal(t, 5, len(conn2.Path.PathSegments)) require.Equal(t, int32(2), atomic.LoadInt32(&counter.Requests)) // Close. + e, err := nsc.Close(ctx, conn) require.NoError(t, err) require.NotNil(t, e) @@ -307,18 +307,22 @@ func TestNSMGR_PassThroughRemote(t *testing.T) { defer domain.Cleanup() for i := 0; i < nodesCount; i++ { - additionalFunctionality := []networkservice.NetworkServiceServer{} + var additionalFunctionality []networkservice.NetworkServiceServer if i != 0 { + k := i // Passtrough to the node i-1 additionalFunctionality = []networkservice.NetworkServiceServer{ - adapters.NewClientToServer( - newPassTroughClient( - []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, - }, - fmt.Sprintf("my-service-remote-%v", i-1), - fmt.Sprintf("endpoint-%v", i-1), - domain.Nodes[i].NSMgr.URL)), + chain.NewNetworkServiceServer( + clienturl.NewServer(domain.Nodes[i].NSMgr.URL), + connect.NewServer(ctx, + sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken, + newPassTroughClient( + fmt.Sprintf("my-service-remote-%v", k-1), + fmt.Sprintf("endpoint-%v", k-1)), + kernel.NewClient()), + append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())..., + ), + ), } } nseReg := ®istry.NetworkServiceEndpoint{ @@ -329,12 +333,11 @@ func TestNSMGR_PassThroughRemote(t *testing.T) { require.NoError(t, err) } - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[nodesCount-1].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[nodesCount-1].NSMgr.URL) request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -367,17 +370,21 @@ func TestNSMGR_PassThroughLocal(t *testing.T) { defer domain.Cleanup() for i := 0; i < nsesCount; i++ { - additionalFunctionality := []networkservice.NetworkServiceServer{} + var additionalFunctionality []networkservice.NetworkServiceServer if i != 0 { + k := i additionalFunctionality = []networkservice.NetworkServiceServer{ - adapters.NewClientToServer( - newPassTroughClient( - []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, - }, - fmt.Sprintf("my-service-remote-%v", i-1), - fmt.Sprintf("endpoint-%v", i-1), - domain.Nodes[0].NSMgr.URL)), + chain.NewNetworkServiceServer( + clienturl.NewServer(domain.Nodes[0].NSMgr.URL), + connect.NewServer(ctx, + sandbox.NewCrossConnectClientFactory(sandbox.GenerateTestToken, + newPassTroughClient( + fmt.Sprintf("my-service-remote-%v", k-1), + fmt.Sprintf("endpoint-%v", k-1)), + kernel.NewClient()), + append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())..., + ), + ), } } nseReg := ®istry.NetworkServiceEndpoint{ @@ -388,12 +395,11 @@ func TestNSMGR_PassThroughLocal(t *testing.T) { require.NoError(t, err) } - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain.Nodes[0].NSMgr.URL) request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ - {Cls: cls.LOCAL, Type: kernel.MECHANISM}, + {Cls: cls.LOCAL, Type: kernelmech.MECHANISM}, }, Connection: &networkservice.Connection{ Id: "1", @@ -412,54 +418,26 @@ func TestNSMGR_PassThroughLocal(t *testing.T) { } type passThroughClient struct { - mechanismPreferences []*networkservice.Mechanism networkService string networkServiceEndpointName string - connectTo *url.URL } -func newPassTroughClient(mechanismPreferences []*networkservice.Mechanism, networkService, networkServiceEndpointName string, connectTo *url.URL) *passThroughClient { +func newPassTroughClient(networkService, networkServiceEndpointName string) *passThroughClient { return &passThroughClient{ - mechanismPreferences: mechanismPreferences, networkService: networkService, networkServiceEndpointName: networkServiceEndpointName, - connectTo: connectTo, } } -func (p *passThroughClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - nsc, err := sandbox.NewClient( - newCtx, sandbox.GenerateTestToken, p.connectTo, - ) - if err != nil { - return nil, err - } - - newRequest := &networkservice.NetworkServiceRequest{ - MechanismPreferences: p.mechanismPreferences, - Connection: &networkservice.Connection{ - Id: request.Connection.Id, - NetworkService: p.networkService, - NetworkServiceEndpointName: p.networkServiceEndpointName, - Path: request.Connection.Path.Clone(), - Context: &networkservice.ConnectionContext{}, - }, - } - conn, err := nsc.Request(newCtx, newRequest) - if err != nil { - return nil, err - } - - request.Connection.Path.PathSegments = conn.Path.PathSegments - +func (c *passThroughClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + request.Connection.NetworkService = c.networkService + request.Connection.NetworkServiceEndpointName = c.networkServiceEndpointName return next.Client(ctx).Request(ctx, request, opts...) } -func (p *passThroughClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - conn = conn.Clone() +func (c *passThroughClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + conn.NetworkService = c.networkService + conn.NetworkServiceEndpointName = c.networkServiceEndpointName return next.Client(ctx).Close(ctx, conn, opts...) } diff --git a/pkg/networkservice/chains/nsmgrproxy/server_test.go b/pkg/networkservice/chains/nsmgrproxy/server_test.go index 1802d6edb..f41490444 100644 --- a/pkg/networkservice/chains/nsmgrproxy/server_test.go +++ b/pkg/networkservice/chains/nsmgrproxy/server_test.go @@ -63,8 +63,7 @@ func TestNSMGR_InterdomainUseCase(t *testing.T) { _, err := sandbox.NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, domain2.Nodes[0].NSMgr) require.NoError(t, err) - nsc, err := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain1.Nodes[0].NSMgr.URL) - require.NoError(t, err) + nsc := sandbox.NewClient(ctx, sandbox.GenerateTestToken, domain1.Nodes[0].NSMgr.URL) request := &networkservice.NetworkServiceRequest{ MechanismPreferences: []*networkservice.Mechanism{ diff --git a/pkg/networkservice/common/connect/DESIGN.md b/pkg/networkservice/common/connect/DESIGN.md deleted file mode 100644 index c36e27bdd..000000000 --- a/pkg/networkservice/common/connect/DESIGN.md +++ /dev/null @@ -1,69 +0,0 @@ -# Functional requirements - -Upon receipt of a Request, the connect Server chain element must send, as a client, a corresponding request to -another Server. For clarity, we will refer to the incoming Request to the server as the 'server Request'. - -If the server request.GetConnection.GetId() is not associated to an existing client Connection, a new Connection -must be established by sending a Request to the Server indicated by the clienturl.ClientURL(ctx). - -If the server request.GetConnection.GetId() is associated to an existing client Connection, that client Connection needs -to be sent as part of the client request to the server the client connection was received from. - -If the server is asked to Close an existing server connection, it should also Close the corresponding client Connection -with the server that client Connection was received from. Even if the attempt to Close the client connection fails, it should -continue to call down the next.Server(ctx) chain. - -If the server is asked to Close a server connection from which it has no corresponding client Connection, it should quietly -return without error. - -# Implementation - -## connectClient -Each incoming server Connection must be translated to its corresponding client Connection for subsequent server Request and Server Close -calls. For this reason, connectClient is implemented, to manage that translation. This results in one 'client' per server Connection. -Each connectClient processes maximally one Request/Close at a time, enforced with an executor. The connectClient performs -the proper translation (or storage) of the client Connection and then calls the next.Client(ctx). Upon receiving a Close, connectClient, -calls a 'cancel' function provided to it at its construction. - -## connectServer - -connectServer keeps clientsByID [clientmap.Map](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/gen.go#L27) mapping incoming server Connection.ID -to a chain consisting of the corresponding connectClient and a -[clienturl.NewClient(...)](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go#L44) -which handles the instantiation and management of the client connection from the clienturl.ClientURL(ctx) of the server -Request. Notably, -[clienturl.NewClient(...)](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go#L44) -uses [setFinalizer](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go#L80) to ensure -that once it is garbage collected its context is cancelled and its corresponding grpc.ClientConn is Closed. - -connectServer also keeps a clientsByURL -[clientmap.RefcountMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/refcount.go#L43) of -[clienturl.NewClient(...)](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go#L44). - -Care is taken to make sure that each connectClient results in one increment of the refcount on its creation, and one decrement when -it receives a Close. In this way, we can be sure that: - -1. clienturl.NewClient(...) is not referenced in clientsByURL after the last connectClient using it has received its Close. -2. If for any reason the clienturl.NewClient(...) is deleted prematurely from the clientsByURL map, [which can happen](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/refcount.go#L78), -any connectClients actively using it retain their pointer to it, and can continue to utilize it throughout their lifetime. - -The overall result is that usually, connectServer will have no more than one clienturl.NewClient(...) per clientURL. -It may occasionally have more than one in a transient fashion for the lifetime of one or more Connections. In all events -it will have zero clienturl.NewClient(...)s for a clientURL if it has no server Connections for that clientURL. - -## Comments on concurrency characteristics. - -Concurrency is primarily managed through type-specific wrappers of [sync.Map](https://golang.org/pkg/sync/#Map): -> The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once -> but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries -> for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a -> Go map paired with a separate Mutex or RWMutex. - -This is precisely our case. - -[clientmap.RefcountMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/refcount.go#L43) utilizes -a simple atomic refcount, which is also highly performant. - -connectClient itself is fully serialized by design. It should almost never happen that we receive more than one Request/Close -before the one before it can be processed, so this will almost never have appreciable performance impact. In the unusual -circumstance we do, it will retain correctness via its internal serialize.Executor. diff --git a/pkg/networkservice/common/connect/README.md b/pkg/networkservice/common/connect/README.md new file mode 100644 index 000000000..65d0e03fb --- /dev/null +++ b/pkg/networkservice/common/connect/README.md @@ -0,0 +1,51 @@ +# Functional requirements + +Upon receipt of a Request, the connect Server chain element must send, as a client, a corresponding request to +another Server. For clarity, we will refer to the incoming Request to the server as the 'server Request'. + +If the server request.GetConnection.GetId() is not associated to an existing client Connection, a new Connection +must be established by sending a Request to the Server indicated by the clienturl.ClientURL(ctx). + +If the server request.GetConnection.GetId() is associated to an existing client Connection, that client Connection needs +to be sent as part of the client request to the server the client connection was received from. + +If the server is asked to Close an existing server connection, it should also Close the corresponding client Connection +with the server that client Connection was received from. Even if the attempt to Close the client connection fails, it should +continue to call down the next.Server(ctx) chain. + +If the server is asked to Close a server connection from which it has no corresponding client Connection, it should quietly +return without error. + +# Implementation + +## connectServer + +`connectServer` keeps `connInfos` [connectionInfoMap](https://github.com/networkservicemesh/pkg/networkservice/common/connect/gen.go#25) +mapping incoming server `Connection.ID` to the remote server URL and to the client chain assigned to this URL, and `clients` [clientmap.RefcountMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/refcount.go) +mapping remote server URL to a [clienturl.NewClient(...)](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go) +which handles the instantiation and management of the client connection from the `clienturl.ClientURL(ctx)` of the server +Request. Notably, on every [clienturl.NewClient(...)](https://github.com/networkservicemesh/sdk/blob/master/pkg/networkservice/common/clienturl/client.go) +Close it is deleted from the `clients` map, so eventually its context will be canceled and the corresponding `grpc.ClientConn` +will be closed. + +Care is taken to make sure that each client chain results in one increment of the refcount on its creation, and one +decrement when it receives a Close. In this way, we can be sure that `clienturl.NewClient(...)` context is closed after +the last client chain using it has received its Close. + +The overall result is that usually, `connectServer` will have no more than one `clienturl.NewClient(...)` per `clientURL`. +It may occasionally have more than one in a transient fashion for the lifetime of one or more Connections. +In all events it will have zero `clienturl.NewClient(...)` for a `clientURL` if it has no server Connections for that +`clientURL`. + +## Comments on concurrency characteristics. + +Concurrency is primarily managed through type-specific wrappers of [sync.Map](https://golang.org/pkg/sync/#Map): +> The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once +> but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries +> for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a +> Go map paired with a separate Mutex or RWMutex. + +This is precisely our case. + +[clientmap.RefcountMap](https://github.com/networkservicemesh/sdk/blob/master/pkg/tools/clientmap/refcount.go#L43) utilizes +a simple atomic refcount, which is also highly performant. diff --git a/pkg/networkservice/common/connect/client.go b/pkg/networkservice/common/connect/client.go index 06f80ba2d..5a012bb12 100644 --- a/pkg/networkservice/common/connect/client.go +++ b/pkg/networkservice/common/connect/client.go @@ -1,3 +1,5 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// // Copyright (c) 2020 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 @@ -18,47 +20,25 @@ package connect import ( "context" - "sync" "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" ) type connectClient struct { - mechanism *networkservice.Mechanism - cancel context.CancelFunc - mu sync.Mutex -} - -// NewClient - client chain element for use with single incoming connection, translates from incoming server connection to -// outgoing client connection -func NewClient(cancel context.CancelFunc) networkservice.NetworkServiceClient { - return &connectClient{ - cancel: cancel, - } + onClose func() } func (c *connectClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { - c.mu.Lock() - defer c.mu.Unlock() - clientRequest := request.Clone() - clientRequest.MechanismPreferences = nil - clientRequest.Connection.Mechanism = c.mechanism - conn, err := next.Client(ctx).Request(ctx, clientRequest) - c.mechanism = conn.GetMechanism() - return conn, err + return next.Client(ctx).Request(ctx, request, opts...) } func (c *connectClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { - c.mu.Lock() - defer c.mu.Unlock() - conn = conn.Clone() - conn.Mechanism = c.mechanism - e, err := next.Client(ctx).Close(ctx, conn) - c.mechanism = nil - c.cancel() - return e, err + _, err := next.Client(ctx).Close(ctx, conn, opts...) + c.onClose() + return &empty.Empty{}, err } diff --git a/pkg/networkservice/common/connect/connection_info_map.gen.go b/pkg/networkservice/common/connect/connection_info_map.gen.go new file mode 100644 index 000000000..c54fb8786 --- /dev/null +++ b/pkg/networkservice/common/connect/connection_info_map.gen.go @@ -0,0 +1,73 @@ +// Code generated by "-output connection_info_map.gen.go -type connectionInfoMap -output connection_info_map.gen.go -type connectionInfoMap"; DO NOT EDIT. +package connect + +import ( + "sync" // Used by sync.Map. +) + +// Generate code that will fail if the constants change value. +func _() { + // An "cannot convert connectionInfoMap literal (type connectionInfoMap) to type sync.Map" compiler error signifies that the base type have changed. + // Re-run the go-syncmap command to generate them again. + _ = (sync.Map)(connectionInfoMap{}) +} + +var _nil_connectionInfoMap_connectionInfo_value = func() (val connectionInfo) { return }() + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *connectionInfoMap) Load(key string) (connectionInfo, bool) { + value, ok := (*sync.Map)(m).Load(key) + if value == nil { + return _nil_connectionInfoMap_connectionInfo_value, ok + } + return value.(connectionInfo), ok +} + +// Store sets the value for a key. +func (m *connectionInfoMap) Store(key string, value connectionInfo) { + (*sync.Map)(m).Store(key, value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *connectionInfoMap) LoadOrStore(key string, value connectionInfo) (connectionInfo, bool) { + actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) + if actual == nil { + return _nil_connectionInfoMap_connectionInfo_value, loaded + } + return actual.(connectionInfo), loaded +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *connectionInfoMap) LoadAndDelete(key string) (value connectionInfo, loaded bool) { + actual, loaded := (*sync.Map)(m).LoadAndDelete(key) + if actual == nil { + return _nil_connectionInfoMap_connectionInfo_value, loaded + } + return actual.(connectionInfo), loaded +} + +// Delete deletes the value for a key. +func (m *connectionInfoMap) Delete(key string) { + (*sync.Map)(m).Delete(key) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *connectionInfoMap) Range(f func(key string, value connectionInfo) bool) { + (*sync.Map)(m).Range(func(key, value interface{}) bool { + return f(key.(string), value.(connectionInfo)) + }) +} diff --git a/pkg/networkservice/common/connect/gen.go b/pkg/networkservice/common/connect/gen.go new file mode 100644 index 000000000..9da34e9ee --- /dev/null +++ b/pkg/networkservice/common/connect/gen.go @@ -0,0 +1,25 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connect + +import ( + "sync" +) + +//go:generate go-syncmap -output connection_info_map.gen.go -type connectionInfoMap + +type connectionInfoMap sync.Map diff --git a/pkg/networkservice/common/connect/server.go b/pkg/networkservice/common/connect/server.go index b41e0ef6f..9dada9cb5 100644 --- a/pkg/networkservice/common/connect/server.go +++ b/pkg/networkservice/common/connect/server.go @@ -1,4 +1,5 @@ // Copyright (c) 2020 Doc.ai and/or its affiliates. +// // Copyright (c) 2020 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 @@ -20,31 +21,42 @@ package connect import ( "context" - - "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" + "net/url" "github.com/golang/protobuf/ptypes/empty" - "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/pkg/errors" "google.golang.org/grpc" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" "github.com/networkservicemesh/sdk/pkg/tools/clientmap" + "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" + "github.com/networkservicemesh/sdk/pkg/tools/log" ) type connectServer struct { ctx context.Context clientFactory func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient clientDialOptions []grpc.DialOption - clientsByURL clientmap.RefcountMap // key == clientURL.String() - clientsByID clientmap.Map // key == client connection ID + connInfos connectionInfoMap + clients clientmap.RefcountMap +} + +type connectionInfo struct { + clientURL *url.URL + client networkservice.NetworkServiceClient } // NewServer - chain element that -func NewServer(ctx context.Context, clientFactory func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient, clientDialOptions ...grpc.DialOption) networkservice.NetworkServiceServer { +func NewServer( + ctx context.Context, + clientFactory func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient, + clientDialOptions ...grpc.DialOption, +) networkservice.NetworkServiceServer { return &connectServer{ ctx: ctx, clientFactory: clientFactory, @@ -52,90 +64,90 @@ func NewServer(ctx context.Context, clientFactory func(ctx context.Context, cc g } } -func (c *connectServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { - clientConn, clientErr := c.client(ctx, request.GetConnection()).Request(ctx, request) - - if clientErr != nil { - return nil, clientErr - } - // Copy Context from client to response from server - request.GetConnection().Context = clientConn.Context - request.GetConnection().Path = clientConn.Path - - // Carry on with next.Server - conn, err := next.Server(ctx).Request(ctx, request) +func (s *connectServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + conn, err := s.client(ctx, request.GetConnection()).Request(ctx, request.Clone()) if err != nil { return nil, err } - // Return result - return conn, err + // Update request.Connection + request.Connection = conn + + // Carry on with next.Server + return next.Server(ctx).Request(ctx, request) } -func (c *connectServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { +func (s *connectServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { var clientErr error - client, _ := c.clientsByID.Load(conn.GetId()) - if client == nil { - return &empty.Empty{}, nil + if connInfo, ok := s.connInfos.LoadAndDelete(conn.GetId()); ok { + _, clientErr = connInfo.client.Close(ctx, conn) } - _, clientErr = client.Close(ctx, conn) - rv, err := next.Server(ctx).Close(ctx, conn) + + _, err := next.Server(ctx).Close(ctx, conn) + if clientErr != nil && err != nil { - return rv, errors.Wrapf(err, "errors during client close: %v", clientErr) + return nil, errors.Wrapf(err, "errors during client close: %v", clientErr) } if clientErr != nil { - return rv, errors.Wrap(clientErr, "errors during client close") + return nil, errors.Wrap(clientErr, "errors during client close") } - return rv, err + return &empty.Empty{}, err } -func (c *connectServer) client(ctx context.Context, conn *networkservice.Connection) networkservice.NetworkServiceClient { - // Fast path, if we have a client for this conn.GetId(), use it - client, _ := c.clientsByID.Load(conn.GetId()) - - // If we didn't find a client, we fall back to clientURL - if client == nil { - clientURL := clienturlctx.ClientURL(ctx) - // If we don't have a clientURL, all we can do is return errors - if clientURL == nil { - clientErr := errors.Errorf("clientURL not found for incoming connection: %+v", conn) - return injecterror.NewClient(clientErr) +func (s *connectServer) client(ctx context.Context, conn *networkservice.Connection) networkservice.NetworkServiceClient { + logEntry := log.Entry(ctx).WithField("connectServer", "client") + + clientURL := clienturlctx.ClientURL(ctx) + if clientURL == nil { + err := errors.Errorf("clientURL not found for incoming connection: %+v", conn) + return injecterror.NewClient(err) + } + + // First check if we have already requested some clientURL with this conn.GetID(). + if connInfo, ok := s.connInfos.Load(conn.GetId()); ok { + if *connInfo.clientURL == *clientURL { + return connInfo.client } - // Fast path: load the client by URL. In the unfortunate event someone has poorly chosen - // a clientFactory or dialOptions that take time, this will be faster than - // creating a new one and doing a LoadOrStore - client, _ = c.clientsByURL.Load(clientURL.String()) - // If we still don't have a client, create one, and LoadOrStore it - // Note: It is possible for multiple nearly simultaneous initial Requests to race this client == nil check - // and both enter into the body of the if block. However, if that occurs, when they call call clientByID.LoadAndStore - // only one of them will Store, the rest will Load. - // The return of load == true from the clientById.LoadAndStore(conn.GetId()) indicates that there has been - // a race, and those receiving it must then call clientByURL.Delete(clientURL) to clean up the Refcount, so - // we only get one refcount increment per conn.GetId(). In this way correctness is preserved, even in the - // unlikely case of multiple nearly simultaneous initial Requests racing this client == nil - if client == nil { - // Note: clienturl.NewClient(...) will get properly cleaned up when dereferences - client = clienturl.NewClient(clienturlctx.WithClientURL(c.ctx, clientURL), c.clientFactory, c.clientDialOptions...) - client, _ = c.clientsByURL.LoadOrStore(clientURL.String(), client) - // Wrap the client in a per-connection connect.NewClient(...) - // when this client receive a 'Close' it will call the cancelFunc provided deleting it from the various - // maps. - client = chain.NewNetworkServiceClient( - NewClient(func() { - c.clientsByID.Delete(conn.GetId()) - c.clientsByURL.Delete(clientURL.String()) - }), - client, - ) - var loaded bool - client, loaded = c.clientsByID.LoadOrStore(conn.GetId(), client) - if loaded { - // If loaded == true, then another Request for the same conn.GetId() was being processed in parallel - // since both of those called c.clientsByURL.LoadOrStore, the refcount for this one conn.GetId() - // got incremented *twice*. Correct for that here by decrementing - c.clientsByURL.Delete(conn.GetId()) - } + // For some reason we have changed the clientURL, so we need to close the existing client. + if _, clientErr := connInfo.client.Close(ctx, conn); clientErr != nil { + logEntry.Warnf("failed to close client: %+v", clientErr) } } + + // Fast path if we already have client for the clientURL, use it. + client, loaded := s.clients.Load(clientURL.String()) + if !loaded { + // If not, create and LoadOrStore a new one. + newClient, cancel := s.newClient(clientURL) + client, loaded = s.clients.LoadOrStore(clientURL.String(), newClient) + if loaded { + // No one will use `newClient`, it should be canceled. + cancel() + } + } + + s.connInfos.Store(conn.GetId(), connectionInfo{ + clientURL: clientURL, + client: client, + }) + return client } + +func (s *connectServer) newClient(clientURL *url.URL) (networkservice.NetworkServiceClient, context.CancelFunc) { + clientPtr := new(networkservice.NetworkServiceClient) + + ctx, cancel := context.WithCancel(s.ctx) + onClose := func() { + if deleted := s.clients.Delete(clientURL.String()); deleted { + cancel() + } + } + + *clientPtr = chain.NewNetworkServiceClient( + &connectClient{onClose: onClose}, + clienturl.NewClient(clienturlctx.WithClientURL(ctx, clientURL), s.clientFactory, s.clientDialOptions...), + ) + + return *clientPtr, cancel +} diff --git a/pkg/networkservice/common/connect/server_test.go b/pkg/networkservice/common/connect/server_test.go index f0d2dc9a2..3b277a919 100644 --- a/pkg/networkservice/common/connect/server_test.go +++ b/pkg/networkservice/common/connect/server_test.go @@ -18,216 +18,301 @@ package connect_test import ( "context" - "fmt" "net/url" + "strconv" "sync" + "sync/atomic" "testing" - "time" - "github.com/google/uuid" - "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/golang/protobuf/ptypes/empty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" "google.golang.org/grpc" - "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/memif" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/vfio" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/refresh" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/setextracontext" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatetoken" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" - "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injectpeer" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" - "github.com/networkservicemesh/sdk/pkg/tools/sandbox" - "github.com/networkservicemesh/sdk/pkg/tools/token" - - "github.com/stretchr/testify/require" - "go.uber.org/goleak" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" ) const ( - timeout = 10 * time.Second + parallelCount = 1000 ) -type nseTest struct { - ctx context.Context - cancel context.CancelFunc - listenOn *url.URL - t *testing.T - nse endpoint.Endpoint - errCh <-chan error -} +func startServer(ctx context.Context, listenOn *url.URL, server networkservice.NetworkServiceServer) error { + grpcServer := grpc.NewServer() + networkservice.RegisterNetworkServiceServer(grpcServer, server) -func (nseT *nseTest) Stop() { - nseT.cancel() - // try read value from err channel, with this we will wait for cancel to be processed and all go routines will exit - <-nseT.errCh + errCh := grpcutils.ListenAndServe(ctx, listenOn, grpcServer) + select { + case err := <-errCh: + return err + default: + return nil + } } -func (nseT *nseTest) Setup() { - nseT.ctx, nseT.cancel = context.WithTimeout(context.Background(), 50*time.Second) - nseT.listenOn = &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"} - nseT.nse = endpoint.NewServer(nseT.ctx, "testServer", authorize.NewServer(), - sandbox.GenerateTestToken, - setextracontext.NewServer(map[string]string{"ok": "all is ok"})) +func TestConnectServer_Request(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - nseT.errCh = endpoint.Serve(nseT.ctx, nseT.listenOn, nseT.nse) -} + // 1. Create connectServer -func (nseT *nseTest) newNSEContext(ctx context.Context) context.Context { - return clienturlctx.WithClientURL(ctx, &url.URL{Scheme: "tcp", Host: nseT.listenOn.Host}) -} + serverNext := new(captureServer) + serverClient := new(captureServer) -func newClient(ctx context.Context, generatorFunc token.GeneratorFunc) networkservice.NetworkServiceClient { - return chain.NewNetworkServiceClient( - append([]networkservice.NetworkServiceClient{ - authorize.NewClient(), - updatepath.NewClient("nsc-" + uuid.New().String()), - refresh.NewClient(ctx), - }, - injectpeer.NewClient(), - updatetoken.NewClient(generatorFunc), - adapters.NewServerToClient(connect.NewServer(ctx, func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { - return networkservice.NewNetworkServiceClient(cc) - }, grpc.WithBlock(), grpc.WithInsecure())), - )...) -} + s := next.NewNetworkServiceServer( + connect.NewServer(context.TODO(), + func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return next.NewNetworkServiceClient( + adapters.NewServerToClient(serverClient), + networkservice.NewNetworkServiceClient(cc), + ) + }, + grpc.WithInsecure(), + ), + serverNext, + ) -func TestConnectServerShouldNotPanicOnRequest(t *testing.T) { - defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - nseT := &nseTest{ - t: t, - } - nseT.Setup() - defer nseT.Stop() - - t.Run("Check Request", func(t *testing.T) { - require.NotPanics(t, func() { - clientURLCtx, clientCancel := context.WithTimeout(context.Background(), timeout) - defer clientCancel() - clientURLCtx = nseT.newNSEContext(clientURLCtx) - - c := newClient(clientURLCtx, sandbox.GenerateTestToken) - conn, err := c.Request(clientURLCtx, &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "1", - }, - }) - require.Nil(t, err) - require.NotNil(t, conn) - require.Equal(t, "all is ok", conn.GetContext().GetExtraContext()["ok"]) - _, err = c.Close(clientURLCtx, &networkservice.Connection{ - Id: conn.Id, - }) - require.Nil(t, err) - }) - }) - t.Run("Close Id", func(t *testing.T) { - require.NotPanics(t, func() { - clientURLCtx, clientCancel := context.WithTimeout(context.Background(), timeout) - defer clientCancel() - clientURLCtx = nseT.newNSEContext(clientURLCtx) - - c := newClient(clientURLCtx, sandbox.GenerateTestToken) - conn, err := c.Request(clientURLCtx, &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "1", - }, - }) - require.Nil(t, err) - require.NotNil(t, conn) - require.Equal(t, "all is ok", conn.GetContext().GetExtraContext()["ok"]) - - // Do not pass clientURL - _, err = c.Close(context.Background(), &networkservice.Connection{ - Id: "1", - }) - require.Nil(t, err) - }) - }) - t.Run("Check no clientURL", func(t *testing.T) { - require.NotPanics(t, func() { - clientURLCtx, clientCancel := context.WithTimeout(context.Background(), timeout) - defer clientCancel() - - c := newClient(clientURLCtx, sandbox.GenerateTestToken) - conn, err := c.Request(context.Background(), &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "1", - }, - }) - require.NotNil(t, err) - require.Nil(t, conn) - }) - }) - t.Run("Request without client URL", func(t *testing.T) { - require.NotPanics(t, func() { - clientURLCtx, clientCancel := context.WithTimeout(context.Background(), timeout) - defer clientCancel() - clientURLCtx = nseT.newNSEContext(clientURLCtx) - - c := newClient(clientURLCtx, sandbox.GenerateTestToken) - conn, err := c.Request(clientURLCtx, &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "1", + // 3. Setup servers + + urlA := &url.URL{Scheme: "tcp", Host: "127.0.0.1:10000"} + serverA := new(captureServer) + + err := startServer(ctx, urlA, next.NewNetworkServiceServer( + serverA, + newEditServer("a", "A", &networkservice.Mechanism{ + Cls: cls.LOCAL, + Type: kernel.MECHANISM, + }), + )) + require.NoError(t, err) + + urlB := &url.URL{Scheme: "tcp", Host: "127.0.0.1:10001"} + serverB := new(captureServer) + + err = startServer(ctx, urlB, next.NewNetworkServiceServer( + serverB, + newEditServer("b", "B", &networkservice.Mechanism{ + Cls: cls.LOCAL, + Type: memif.MECHANISM, + }), + )) + require.NoError(t, err) + + // 4. Create request + + request := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "id", + NetworkService: "network-service", + Mechanism: &networkservice.Mechanism{ + Cls: cls.LOCAL, + Type: vfio.MECHANISM, }, - }) - require.Nil(t, err) - require.NotNil(t, conn) - require.Equal(t, "all is ok", conn.GetContext().GetExtraContext()["ok"]) - - // Request again - conn, err = c.Request(context.Background(), &networkservice.NetworkServiceRequest{ - Connection: &networkservice.Connection{ - Id: "1", + Context: &networkservice.ConnectionContext{ + ExtraContext: map[string]string{ + "not": "empty", + }, }, - }) - require.Nil(t, err) - require.NotNil(t, conn) - require.Equal(t, "all is ok", conn.GetContext().GetExtraContext()["ok"]) - - // Do not pass clientURL - _, err = c.Close(context.Background(), &networkservice.Connection{ - Id: "1", - }) - require.Nil(t, err) - }) - }) + }, + } + + // 5. Request A + + conn, err := s.Request(clienturlctx.WithClientURL(ctx, urlA), request.Clone()) + require.NoError(t, err) + + requestClient := request.Clone() + require.Equal(t, requestClient.String(), serverClient.capturedRequest.String()) + + requestA := request.Clone() + require.Equal(t, requestA.String(), serverA.capturedRequest.String()) + + requestNext := request.Clone() + requestNext.Connection.Mechanism.Type = kernel.MECHANISM + requestNext.Connection.Context.ExtraContext["a"] = "A" + require.Equal(t, requestNext.String(), serverNext.capturedRequest.String()) + + require.Equal(t, requestNext.Connection.String(), conn.String()) + + // 6. Request B + + request.Connection = conn + + conn, err = s.Request(clienturlctx.WithClientURL(ctx, urlB), request.Clone()) + require.NoError(t, err) + + requestClient = request.Clone() + require.Equal(t, requestClient.String(), serverClient.capturedRequest.String()) + + require.Nil(t, serverA.capturedRequest) + + requestB := request.Clone() + require.Equal(t, requestB.String(), serverB.capturedRequest.String()) + + requestNext = request.Clone() + requestNext.Connection.Mechanism.Type = memif.MECHANISM + requestNext.Connection.Context.ExtraContext["b"] = "B" + require.Equal(t, requestNext.String(), serverNext.capturedRequest.String()) + + require.Equal(t, requestNext.Connection.String(), conn.String()) + + // 8. Close B + + _, err = s.Close(ctx, conn) + require.NoError(t, err) + + require.Nil(t, serverClient.capturedRequest) + require.Nil(t, serverB.capturedRequest) + require.Nil(t, serverNext.capturedRequest) + }() } -func TestParallelDial(t *testing.T) { +func TestConnectServer_RequestParallel(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - nseT := &nseTest{} - nseT.Setup() - defer nseT.Stop() - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - j := i - clientURLCtx, clientCancel := context.WithTimeout(context.Background(), timeout) - defer clientCancel() - clientURLCtx = nseT.newNSEContext(clientURLCtx) - go func() { - defer wg.Done() - for k := 0; k < 10; k++ { - c := newClient(clientURLCtx, sandbox.GenerateTestToken) - conn, err := c.Request(clientURLCtx, &networkservice.NetworkServiceRequest{ + // 1. Create connectServer + + serverNext := new(countServer) + serverClient := new(countServer) + + s := next.NewNetworkServiceServer( + connect.NewServer(context.TODO(), + func(_ context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return next.NewNetworkServiceClient( + adapters.NewServerToClient(serverClient), + networkservice.NewNetworkServiceClient(cc), + ) + }, + grpc.WithInsecure(), + ), + serverNext, + ) + + func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 3. Setup servers + + urlA := &url.URL{Scheme: "tcp", Host: "127.0.0.1:10000"} + serverA := new(countServer) + + err := startServer(ctx, urlA, serverA) + require.NoError(t, err) + + // 4. Request A + + wg := new(sync.WaitGroup) + wg.Add(parallelCount) + + barrier := new(sync.WaitGroup) + barrier.Add(1) + + for i := 0; i < parallelCount; i++ { + go func(k int) { + // 4.1. Create request + request := &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{ - Id: fmt.Sprintf("%d", j), + Id: strconv.Itoa(k), }, - }) - require.Nil(t, err) - require.NotNil(t, conn) + } + + // 4.2. Request A + _, err := s.Request(clienturlctx.WithClientURL(ctx, urlA), request) + assert.NoError(t, err) + wg.Done() + + barrier.Wait() + + // 4.3. Re request A + conn, err := s.Request(clienturlctx.WithClientURL(ctx, urlA), request) + assert.NoError(t, err) + + // 4.4. Close A + _, err = s.Close(ctx, conn) + assert.NoError(t, err) + wg.Done() + }(i) + } + + wg.Wait() + wg.Add(parallelCount) + + require.Equal(t, int32(parallelCount), serverClient.count) + require.Equal(t, int32(parallelCount), serverA.count) + require.Equal(t, int32(parallelCount), serverNext.count) + + barrier.Done() + wg.Wait() - _, err = c.Close(clientURLCtx, conn) + require.Equal(t, int32(parallelCount), serverClient.count) + require.Equal(t, int32(parallelCount), serverA.count) + require.Equal(t, int32(parallelCount), serverNext.count) + }() +} + +type editServer struct { + key string + value string + mechanism *networkservice.Mechanism +} - require.Nil(t, err) - } - }() +func newEditServer(key, value string, mechanism *networkservice.Mechanism) *editServer { + return &editServer{ + key: key, + value: value, + mechanism: mechanism, } - wg.Wait() +} + +func (s *editServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + request.Connection.Context.ExtraContext[s.key] = s.value + request.Connection.Mechanism = s.mechanism + + return next.Server(ctx).Request(ctx, request) +} + +func (s *editServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + return next.Server(ctx).Close(ctx, conn) +} + +type captureServer struct { + capturedRequest *networkservice.NetworkServiceRequest +} + +func (s *captureServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + s.capturedRequest = request.Clone() + return next.Server(ctx).Request(ctx, request) +} + +func (s *captureServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + s.capturedRequest = nil + return next.Server(ctx).Close(ctx, conn) +} + +type countServer struct { + count int32 +} + +func (s *countServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + atomic.AddInt32(&s.count, 1) + return next.Server(ctx).Request(ctx, request) +} + +func (s *countServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + atomic.AddInt32(&s.count, -1) + return next.Server(ctx).Close(ctx, conn) } diff --git a/pkg/networkservice/common/mechanismtranslation/client.go b/pkg/networkservice/common/mechanismtranslation/client.go new file mode 100644 index 000000000..66752ab19 --- /dev/null +++ b/pkg/networkservice/common/mechanismtranslation/client.go @@ -0,0 +1,72 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package mechanismtranslation provides client chain element to perform serverRequest -> clientRequest and +// clientConn -> serverConn mechanism translations +package mechanismtranslation + +import ( + "context" + + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +type mechanismTranslationClient struct { + mechs mechanismMap +} + +// NewClient returns a new translation client chain element +func NewClient() networkservice.NetworkServiceClient { + return new(mechanismTranslationClient) +} + +func (c *mechanismTranslationClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (conn *networkservice.Connection, err error) { + connID := request.GetConnection().GetId() + + // 1. Translate request mechanisms + clientRequest := request.Clone() + clientRequest.MechanismPreferences = nil + + mech, _ := c.mechs.Load(connID) + clientRequest.Connection.Mechanism = mech + + // 2. Request client chain + clientConn, err := next.Client(ctx).Request(ctx, clientRequest, opts...) + if err != nil { + return nil, err + } + c.mechs.Store(connID, clientConn.Mechanism) + + // 3. Translate connection mechanism + conn = clientConn.Clone() + conn.Mechanism = request.Connection.GetMechanism() + + return conn, nil +} + +func (c *mechanismTranslationClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + // 1. Translate connection mechanism + mech, _ := c.mechs.LoadAndDelete(conn.GetId()) + conn.Mechanism = mech + + // 2. Close client chain + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/mechanismtranslation/client_test.go b/pkg/networkservice/common/mechanismtranslation/client_test.go new file mode 100644 index 000000000..f5ace3f4a --- /dev/null +++ b/pkg/networkservice/common/mechanismtranslation/client_test.go @@ -0,0 +1,121 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mechanismtranslation_test + +import ( + "context" + "testing" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls" + kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel" + "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/vfio" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/kernel" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanismtranslation" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" +) + +func kernelMechanism() *networkservice.Mechanism { + request := new(networkservice.NetworkServiceRequest) + _, _ = kernel.NewClient().Request(context.TODO(), request) + return request.MechanismPreferences[0] +} + +func TestMechanismTranslationClient(t *testing.T) { + capture := new(captureClient) + + client := next.NewNetworkServiceClient( + mechanismtranslation.NewClient(), + capture, + kernel.NewClient(), + adapters.NewServerToClient( + mechanisms.NewServer(map[string]networkservice.NetworkServiceServer{ + kernelmech.MECHANISM: null.NewServer(), + }), + ), + ) + + request := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: "id", + Mechanism: &networkservice.Mechanism{ + Cls: cls.LOCAL, + Type: vfio.MECHANISM, + }, + Context: &networkservice.ConnectionContext{ + ExtraContext: map[string]string{ + "a": "A", + }, + }, + }, + MechanismPreferences: []*networkservice.Mechanism{{ + Cls: cls.LOCAL, + Type: vfio.MECHANISM, + }}, + } + + // 1. Request + + conn, err := client.Request(context.TODO(), request.Clone()) + require.NoError(t, err) + require.Equal(t, request.Connection.String(), conn.String()) + + captureRequest := request.Clone() + captureRequest.MechanismPreferences = nil + captureRequest.Connection.Mechanism = nil + require.Equal(t, captureRequest.String(), capture.request.String()) + + // 2. Refresh + + conn, err = client.Request(context.TODO(), request.Clone()) + require.NoError(t, err) + require.Equal(t, request.Connection.String(), conn.String()) + + captureRequest = request.Clone() + captureRequest.MechanismPreferences = nil + captureRequest.Connection.Mechanism = kernelMechanism() + require.Equal(t, captureRequest.String(), capture.request.String()) + + // 3. Close + + _, err = client.Close(context.TODO(), conn.Clone()) + require.NoError(t, err) + + require.Equal(t, captureRequest.Connection.String(), capture.conn.String()) +} + +type captureClient struct { + request *networkservice.NetworkServiceRequest + conn *networkservice.Connection +} + +func (c *captureClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + c.request = request.Clone() + return next.Client(ctx).Request(ctx, request, opts...) +} + +func (c *captureClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) { + c.conn = conn.Clone() + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/mechanismtranslation/gen.go b/pkg/networkservice/common/mechanismtranslation/gen.go new file mode 100644 index 000000000..aa6be83cf --- /dev/null +++ b/pkg/networkservice/common/mechanismtranslation/gen.go @@ -0,0 +1,25 @@ +// Copyright (c) 2020 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mechanismtranslation + +import ( + "sync" +) + +//go:generate go-syncmap -output mechanism_map.gen.go -type mechanismMap + +type mechanismMap sync.Map diff --git a/pkg/networkservice/common/mechanismtranslation/mechanism_map.gen.go b/pkg/networkservice/common/mechanismtranslation/mechanism_map.gen.go new file mode 100644 index 000000000..7ddde90c0 --- /dev/null +++ b/pkg/networkservice/common/mechanismtranslation/mechanism_map.gen.go @@ -0,0 +1,75 @@ +// Code generated by "-output mechanism_map.gen.go -type mechanismMap -output mechanism_map.gen.go -type mechanismMap"; DO NOT EDIT. +package mechanismtranslation + +import ( + "sync" // Used by sync.Map. + + "github.com/networkservicemesh/api/pkg/api/networkservice" +) + +// Generate code that will fail if the constants change value. +func _() { + // An "cannot convert mechanismMap literal (type mechanismMap) to type sync.Map" compiler error signifies that the base type have changed. + // Re-run the go-syncmap command to generate them again. + _ = (sync.Map)(mechanismMap{}) +} + +var _nil_mechanismMap_networkservice_Mechanism_value = func() (val *networkservice.Mechanism) { return }() + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *mechanismMap) Load(key string) (*networkservice.Mechanism, bool) { + value, ok := (*sync.Map)(m).Load(key) + if value == nil { + return _nil_mechanismMap_networkservice_Mechanism_value, ok + } + return value.(*networkservice.Mechanism), ok +} + +// Store sets the value for a key. +func (m *mechanismMap) Store(key string, value *networkservice.Mechanism) { + (*sync.Map)(m).Store(key, value) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *mechanismMap) LoadOrStore(key string, value *networkservice.Mechanism) (*networkservice.Mechanism, bool) { + actual, loaded := (*sync.Map)(m).LoadOrStore(key, value) + if actual == nil { + return _nil_mechanismMap_networkservice_Mechanism_value, loaded + } + return actual.(*networkservice.Mechanism), loaded +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *mechanismMap) LoadAndDelete(key string) (value *networkservice.Mechanism, loaded bool) { + actual, loaded := (*sync.Map)(m).LoadAndDelete(key) + if actual == nil { + return _nil_mechanismMap_networkservice_Mechanism_value, loaded + } + return actual.(*networkservice.Mechanism), loaded +} + +// Delete deletes the value for a key. +func (m *mechanismMap) Delete(key string) { + (*sync.Map)(m).Delete(key) +} + +// Range calls f sequentially for each key and value present in the map. +// If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot of the Map's +// contents: no key will be visited more than once, but if the value for any key +// is stored or deleted concurrently, Range may reflect any mapping for that key +// from any point during the Range call. +// +// Range may be O(N) with the number of elements in the map even if f returns +// false after a constant number of calls. +func (m *mechanismMap) Range(f func(key string, value *networkservice.Mechanism) bool) { + (*sync.Map)(m).Range(func(key, value interface{}) bool { + return f(key.(string), value.(*networkservice.Mechanism)) + }) +} diff --git a/pkg/tools/clientmap/refcount.go b/pkg/tools/clientmap/refcount.go index e4452c4b7..acb6f087b 100644 --- a/pkg/tools/clientmap/refcount.go +++ b/pkg/tools/clientmap/refcount.go @@ -75,10 +75,15 @@ func (r *RefcountMap) Load(key string) (networkservice.NetworkServiceClient, boo return rv, loaded } -// Delete decrements the refcount and deletes the value for a key if refcount is zero. -func (r *RefcountMap) Delete(key string) { - rv, _ := r.Map.Load(key) +// Delete decrements the refcount and deletes the value for a key if refcount is zero. Returns true if value is no (more) present. +func (r *RefcountMap) Delete(key string) bool { + rv, loaded := r.Map.Load(key) + if !loaded { + return true + } if client, ok := rv.(*refcountClient); ok && atomic.AddInt32(&client.count, -1) == 0 { r.Map.Delete(key) + return true } + return false } diff --git a/pkg/tools/sandbox/builder.go b/pkg/tools/sandbox/builder.go index fcbf64134..4eff9c97a 100644 --- a/pkg/tools/sandbox/builder.go +++ b/pkg/tools/sandbox/builder.go @@ -23,8 +23,6 @@ import ( "testing" "time" - "github.com/networkservicemesh/sdk/pkg/tools/spanhelper" - "github.com/google/uuid" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -39,16 +37,16 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" "github.com/networkservicemesh/sdk/pkg/networkservice/common/connect" "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" - "github.com/networkservicemesh/sdk/pkg/registry/common/dnsresolve" - "github.com/networkservicemesh/sdk/pkg/registry/chains/memory" - proxydns "github.com/networkservicemesh/sdk/pkg/registry/chains/proxydns" + "github.com/networkservicemesh/sdk/pkg/registry/chains/proxydns" + "github.com/networkservicemesh/sdk/pkg/registry/common/dnsresolve" interpose_reg "github.com/networkservicemesh/sdk/pkg/registry/common/interpose" adapter_registry "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" "github.com/networkservicemesh/sdk/pkg/registry/core/chain" "github.com/networkservicemesh/sdk/pkg/tools/addressof" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/sdk/pkg/tools/spanhelper" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -313,16 +311,14 @@ func supplyDummyForwarder(ctx context.Context, name string, generateToken token. generateToken, // Statically set the url we use to the unix file socket for the NSMgr clienturl.NewServer(connectTo), - connect.NewServer( - ctx, - client.NewClientFactory( + connect.NewServer(ctx, + client.NewCrossConnectClientFactory( name, // What to call onHeal addressof.NetworkServiceClient(adapters.NewServerToClient(result)), - generateToken, - ), - grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), - )) - + generateToken), + dialOptions..., + ), + ) return result } diff --git a/pkg/tools/sandbox/utils.go b/pkg/tools/sandbox/utils.go index d2ba2e7e5..a19d226ca 100644 --- a/pkg/tools/sandbox/utils.go +++ b/pkg/tools/sandbox/utils.go @@ -22,8 +22,6 @@ import ( "net/url" "time" - "github.com/networkservicemesh/sdk/pkg/tools/spanhelper" - "github.com/golang/protobuf/ptypes" "github.com/google/uuid" "google.golang.org/grpc" @@ -36,8 +34,10 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint" "github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr" "github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize" - "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/networkservice/common/clienturl" + "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/sdk/pkg/tools/spanhelper" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -81,16 +81,23 @@ func NewEndpoint(ctx context.Context, nse *registry.NetworkServiceEndpoint, gene return &EndpointEntry{Endpoint: ep, URL: u}, nil } -// NewClient creates new networkservice.NetworkServiceClient configured to connect to the passed URL. -func NewClient(ctx context.Context, generatorFunc token.GeneratorFunc, connectTo *url.URL, additionalFunctionality ...networkservice.NetworkServiceClient) (networkservice.NetworkServiceClient, error) { - cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(connectTo), +// NewClient is a client.NewClient over *url.URL with some fields preset for testing +func NewClient(ctx context.Context, generatorFunc token.GeneratorFunc, connectTo *url.URL, additionalFunctionality ...networkservice.NetworkServiceClient) networkservice.NetworkServiceClient { + return clienturl.NewClient( + clienturlctx.WithClientURL(ctx, connectTo), + client.NewClientFactory( + fmt.Sprintf("nsc-%v", uuid.New().String()), + nil, + generatorFunc, + additionalFunctionality...), append(spanhelper.WithTracingDial(), grpc.WithBlock(), grpc.WithInsecure())...) - if err != nil { - return nil, err - } - go func() { - <-ctx.Done() - _ = cc.Close() - }() - return client.NewClient(ctx, fmt.Sprintf("nsc-%v", uuid.New().String()), nil, generatorFunc, cc, additionalFunctionality...), nil +} + +// NewCrossConnectClientFactory is a client.NewCrossConnectClientFactory with some fields preset for testing +func NewCrossConnectClientFactory(generatorFunc token.GeneratorFunc, additionalFunctionality ...networkservice.NetworkServiceClient) func(ctx context.Context, cc grpc.ClientConnInterface) networkservice.NetworkServiceClient { + return client.NewCrossConnectClientFactory( + fmt.Sprintf("nsc-%v", uuid.New().String()), + nil, + generatorFunc, + additionalFunctionality...) }