diff --git a/pkg/registry/common/connect/ns_server_test.go b/pkg/registry/common/connect/ns_server_test.go index d0968fd462..b19a090a2b 100644 --- a/pkg/registry/common/connect/ns_server_test.go +++ b/pkg/registry/common/connect/ns_server_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2022 Doc.ai and/or its affiliates. // +// Copyright (c) 2023 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -107,6 +109,86 @@ func startTestNSServers(ctx context.Context, t *testing.T) (url1, url2 *url.URL, return url1, url2, cancel1, cancel2 } +func Test_NSConenctChain_Find(t *testing.T) { + for depth := 2; depth < 11; depth++ { + for killIndex := 1; killIndex < depth; killIndex++ { + var ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var urls = make([]*url.URL, depth) + + var servers = make([]*struct { + registry.NetworkServiceRegistryServer + kill func() + }, depth) + + for i := 0; i < depth; i++ { + var serverCtx, serverCancel = context.WithCancel(ctx) + + servers[i] = &struct { + registry.NetworkServiceRegistryServer + kill func() + }{ + kill: serverCancel, + } + + urls[i] = new(url.URL) + + require.NoError(t, + startNSServer( + serverCtx, + urls[i], + servers[i], + ), + ) + } + + for i := 0; i < depth-1; i++ { + servers[i].NetworkServiceRegistryServer = chain.NewNetworkServiceRegistryServer( + clienturl.NewNetworkServiceRegistryServer(urls[i+1]), + connect.NewNetworkServiceRegistryServer( + chain.NewNetworkServiceRegistryClient( + begin.NewNetworkServiceRegistryClient(), + clientconn.NewNetworkServiceRegistryClient(), + dial.NewNetworkServiceRegistryClient(ctx, + dial.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), + dial.WithDialTimeout(time.Second), + ), + connect.NewNetworkServiceRegistryClient(), + ), + ), + ) + } + + servers[len(servers)-1].NetworkServiceRegistryServer = memory.NewNetworkServiceRegistryServer() + + c := adapters.NetworkServiceServerToClient(servers[0].NetworkServiceRegistryServer) + + _, err := c.Register(ctx, ®istry.NetworkService{ + Name: "testing", + }) + + require.NoError(t, err) + + stream, err := c.Find(ctx, ®istry.NetworkServiceQuery{ + Watch: true, + NetworkService: ®istry.NetworkService{ + Name: "testing", + }, + }) + require.NoError(t, err) + + _, err = stream.Recv() + require.NoError(t, err) + + servers[killIndex].kill() + + _, err = stream.Recv() + require.Error(t, err) + } + } +} + func TestConnectNSServer_AllUnregister(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) @@ -241,83 +323,3 @@ func TestConnectNSServer_AllDead_WatchingFind(t *testing.T) { for err, i := goleak.Find(), 0; err != nil && i < 3; err, i = goleak.Find(), i+1 { } } - -func Test_NSConenctChain_Find(t *testing.T) { - for depth := 2; depth < 11; depth++ { - for killIndex := 1; killIndex < depth; killIndex++ { - var ctx, cancel = context.WithTimeout(context.Background(), time.Second) - defer cancel() - - var urls = make([]*url.URL, depth) - - var servers = make([]*struct { - registry.NetworkServiceRegistryServer - kill func() - }, depth) - - for i := 0; i < depth; i++ { - var serverCtx, serverCancel = context.WithCancel(ctx) - - servers[i] = &struct { - registry.NetworkServiceRegistryServer - kill func() - }{ - kill: serverCancel, - } - - urls[i] = new(url.URL) - - require.NoError(t, - startNSServer( - serverCtx, - urls[i], - servers[i], - ), - ) - } - - for i := 0; i < depth-1; i++ { - servers[i].NetworkServiceRegistryServer = chain.NewNetworkServiceRegistryServer( - clienturl.NewNetworkServiceRegistryServer(urls[i+1]), - connect.NewNetworkServiceRegistryServer( - chain.NewNetworkServiceRegistryClient( - begin.NewNetworkServiceRegistryClient(), - clientconn.NewNetworkServiceRegistryClient(), - dial.NewNetworkServiceRegistryClient(ctx, - dial.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), - dial.WithDialTimeout(time.Second), - ), - connect.NewNetworkServiceRegistryClient(), - ), - ), - ) - } - - servers[len(servers)-1].NetworkServiceRegistryServer = memory.NewNetworkServiceRegistryServer() - - c := adapters.NetworkServiceServerToClient(servers[0].NetworkServiceRegistryServer) - - _, err := c.Register(ctx, ®istry.NetworkService{ - Name: "testing", - }) - - require.NoError(t, err) - - stream, err := c.Find(ctx, ®istry.NetworkServiceQuery{ - Watch: true, - NetworkService: ®istry.NetworkService{ - Name: "testing", - }, - }) - require.NoError(t, err) - - _, err = stream.Recv() - require.NoError(t, err) - - servers[killIndex].kill() - - _, err = stream.Recv() - require.Error(t, err) - } - } -} diff --git a/pkg/registry/common/connect/nse_server_test.go b/pkg/registry/common/connect/nse_server_test.go index 6a3ce5a992..8b1794540b 100644 --- a/pkg/registry/common/connect/nse_server_test.go +++ b/pkg/registry/common/connect/nse_server_test.go @@ -1,5 +1,7 @@ // Copyright (c) 2020-2022 Doc.ai and/or its affiliates. // +// Copyright (c) 2023 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -107,6 +109,86 @@ func startTestNSEServers(ctx context.Context, t *testing.T) (url1, url2 *url.URL return url1, url2, cancel1, cancel2 } +func Test_ConenctNSEChain_Find(t *testing.T) { + for depth := 2; depth < 11; depth++ { + for killIndex := 1; killIndex < depth; killIndex++ { + var ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var urls = make([]*url.URL, depth) + + var servers = make([]*struct { + registry.NetworkServiceEndpointRegistryServer + kill func() + }, depth) + + for i := 0; i < depth; i++ { + var serverCtx, serverCancel = context.WithCancel(ctx) + + servers[i] = &struct { + registry.NetworkServiceEndpointRegistryServer + kill func() + }{ + kill: serverCancel, + } + + urls[i] = new(url.URL) + + require.NoError(t, + startNSEServer( + serverCtx, + urls[i], + servers[i], + ), + ) + } + + for i := 0; i < depth-1; i++ { + servers[i].NetworkServiceEndpointRegistryServer = chain.NewNetworkServiceEndpointRegistryServer( + clienturl.NewNetworkServiceEndpointRegistryServer(urls[i+1]), + connect.NewNetworkServiceEndpointRegistryServer( + chain.NewNetworkServiceEndpointRegistryClient( + begin.NewNetworkServiceEndpointRegistryClient(), + clientconn.NewNetworkServiceEndpointRegistryClient(), + dial.NewNetworkServiceEndpointRegistryClient(ctx, + dial.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), + dial.WithDialTimeout(time.Second), + ), + connect.NewNetworkServiceEndpointRegistryClient(), + ), + ), + ) + } + + servers[len(servers)-1].NetworkServiceEndpointRegistryServer = memory.NewNetworkServiceEndpointRegistryServer() + + c := adapters.NetworkServiceEndpointServerToClient(servers[0].NetworkServiceEndpointRegistryServer) + + _, err := c.Register(ctx, ®istry.NetworkServiceEndpoint{ + Name: "testing", + }) + + require.NoError(t, err) + + stream, err := c.Find(ctx, ®istry.NetworkServiceEndpointQuery{ + Watch: true, + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + Name: "testing", + }, + }) + require.NoError(t, err) + + _, err = stream.Recv() + require.NoError(t, err) + + servers[killIndex].kill() + + _, err = stream.Recv() + require.Error(t, err) + } + } +} + func TestConnectNSEServer_AllUnregister(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) @@ -241,83 +323,3 @@ func TestConnectNSEServer_AllDead_WatchingFind(t *testing.T) { for err, i := goleak.Find(), 0; err != nil && i < 3; err, i = goleak.Find(), i+1 { } } - -func Test_ConenctNSEChain_Find(t *testing.T) { - for depth := 2; depth < 11; depth++ { - for killIndex := 1; killIndex < depth; killIndex++ { - var ctx, cancel = context.WithTimeout(context.Background(), time.Second) - defer cancel() - - var urls = make([]*url.URL, depth) - - var servers = make([]*struct { - registry.NetworkServiceEndpointRegistryServer - kill func() - }, depth) - - for i := 0; i < depth; i++ { - var serverCtx, serverCancel = context.WithCancel(ctx) - - servers[i] = &struct { - registry.NetworkServiceEndpointRegistryServer - kill func() - }{ - kill: serverCancel, - } - - urls[i] = new(url.URL) - - require.NoError(t, - startNSEServer( - serverCtx, - urls[i], - servers[i], - ), - ) - } - - for i := 0; i < depth-1; i++ { - servers[i].NetworkServiceEndpointRegistryServer = chain.NewNetworkServiceEndpointRegistryServer( - clienturl.NewNetworkServiceEndpointRegistryServer(urls[i+1]), - connect.NewNetworkServiceEndpointRegistryServer( - chain.NewNetworkServiceEndpointRegistryClient( - begin.NewNetworkServiceEndpointRegistryClient(), - clientconn.NewNetworkServiceEndpointRegistryClient(), - dial.NewNetworkServiceEndpointRegistryClient(ctx, - dial.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), - dial.WithDialTimeout(time.Second), - ), - connect.NewNetworkServiceEndpointRegistryClient(), - ), - ), - ) - } - - servers[len(servers)-1].NetworkServiceEndpointRegistryServer = memory.NewNetworkServiceEndpointRegistryServer() - - c := adapters.NetworkServiceEndpointServerToClient(servers[0].NetworkServiceEndpointRegistryServer) - - _, err := c.Register(ctx, ®istry.NetworkServiceEndpoint{ - Name: "testing", - }) - - require.NoError(t, err) - - stream, err := c.Find(ctx, ®istry.NetworkServiceEndpointQuery{ - Watch: true, - NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ - Name: "testing", - }, - }) - require.NoError(t, err) - - _, err = stream.Recv() - require.NoError(t, err) - - servers[killIndex].kill() - - _, err = stream.Recv() - require.Error(t, err) - } - } -}