Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework etcd chain element to distinguish between begin.Unregister() and nse.Unregister() events #460

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/registry/chains/registryk8s/registry-k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func NewServer(config *Config, tokenGenerator token.GeneratorFunc, options ...Op
Condition: func(c context.Context, nse *registry.NetworkServiceEndpoint) bool { return true },
Action: chain.NewNetworkServiceEndpointRegistryServer(
setregistrationtime.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(config.ChainCtx, expire.WithDefaultExpiration(config.ExpirePeriod)),
etcd.NewNetworkServiceEndpointRegistryServer(config.ChainCtx, config.Namespace, config.ClientSet),
expire.NewNetworkServiceEndpointRegistryServer(config.ChainCtx, expire.WithDefaultExpiration(config.ExpirePeriod)),
),
},
),
Expand Down
209 changes: 208 additions & 1 deletion pkg/registry/chains/registryk8s/registry-k8s_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -28,12 +28,14 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/networkservicemesh/api/pkg/api/registry"
registryserver "github.com/networkservicemesh/sdk/pkg/registry"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
"github.com/networkservicemesh/sdk/pkg/tools/token"
Expand Down Expand Up @@ -328,6 +330,191 @@ func TestNSMGR_FloatingInterdomainUseCase(t *testing.T) {
require.NoError(t, err)
}

func TestScaledRegistry_NSEUnregisterWithOldVersionUseCase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t, ignoreKLogDaemon) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

clientSet := fake.NewSimpleClientset()

cluster1 := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

cluster2 := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

// 1. Register Network Service
nsRegistryClient := cluster1.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
nsReg := &registry.NetworkService{Name: "my-service"}
_, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
NetworkServiceNames: []string{"my-service"},
}

// 2. Create two registry clients for registry1 on cluster1 and registry2 on cluster2
registryClient1 := registryclient.NewNetworkServiceEndpointRegistryClient(ctx,
registryclient.WithClientURL(cluster1.Registry.URL),
registryclient.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

registryClient2 := registryclient.NewNetworkServiceEndpointRegistryClient(ctx,
registryclient.WithClientURL(cluster2.Registry.URL),
registryclient.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

// 3. NSE registers itself with version [1] through registry1
nseReg, err = registryClient1.Register(ctx, nseReg)
require.NoError(t, err)

// 4. NSE registers itself again with version [2] through registry2
nseReg, err = registryClient2.Register(ctx, nseReg)
require.NoError(t, err)

// 5. Check that we have one NSE in etcd
s, err := registryClient1.Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
}})
require.NoError(t, err)
list := registry.ReadNetworkServiceEndpointList(s)
require.Len(t, list, 1)

// 6. NSE unregisters itself through registy1 even though registry1 has NSE of the old version [1]
_, err = registryClient1.Unregister(ctx, nseReg)
require.NoError(t, err)

// 7. Check that we don't have NSEs in etcd after unregistration
s, err = registryClient1.Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
}})
require.NoError(t, err)
list = registry.ReadNetworkServiceEndpointList(s)
require.Len(t, list, 0)
}

func TestScaledRegistry_NSEUnregisterInAnotherRegistryUseCase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t, ignoreKLogDaemon) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*50000)
defer cancel()

clientSet := fake.NewSimpleClientset()

cluster1 := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

cluster2 := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

// 1. Register Network Service
nsRegistryClient := cluster1.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
nsReg := &registry.NetworkService{Name: "my-service"}
_, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
NetworkServiceNames: []string{"my-service"},
}

// 2. Create two registry clients for registry1 on cluster1 and registry2 on cluster2
registryClient1 := registryclient.NewNetworkServiceEndpointRegistryClient(ctx,
registryclient.WithClientURL(cluster1.Registry.URL),
registryclient.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

registryClient2 := registryclient.NewNetworkServiceEndpointRegistryClient(ctx,
registryclient.WithClientURL(cluster2.Registry.URL),
registryclient.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

// 3. NSE registers itself with version [1] through registry1
nseReg, err = registryClient1.Register(ctx, nseReg)
require.NoError(t, err)

// 4. Check that we have one NSE in etcd
s, err := registryClient1.Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
}})
require.NoError(t, err)
list := registry.ReadNetworkServiceEndpointList(s)
require.Len(t, list, 1)

// 5. NSE unregisters itself through registry2
_, err = registryClient2.Unregister(ctx, nseReg)
require.NoError(t, err)

// 7. Check that we don't have NSEs in etcd after unregistration
s, err = registryClient1.Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
}})
require.NoError(t, err)
list = registry.ReadNetworkServiceEndpointList(s)
require.Len(t, list, 0)
}

func TestScaledRegistry_ExpireUseCase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t, ignoreKLogDaemon) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

clientSet := fake.NewSimpleClientset()

cluster := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetRegistrySupplier(supplyK8sRegistryWithClientSet(clientSet)).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

// 1. Register Network Service
nsRegistryClient := cluster.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)
nsReg := &registry.NetworkService{Name: "my-service"}
_, err := nsRegistryClient.Register(ctx, nsReg)
require.NoError(t, err)

nseReg := &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
NetworkServiceNames: []string{"my-service"},
}

// 2. Create registry client for registry
dialOptions := sandbox.DialOptions(sandbox.WithTokenGenerator(sandbox.GenerateExpiringToken(time.Second * 2)))
registryClient := registryclient.NewNetworkServiceEndpointRegistryClient(ctx,
registryclient.WithClientURL(cluster.Registry.URL),
registryclient.WithDialOptions(dialOptions...))

// 3. NSE registers itself
_, err = registryClient.Register(ctx, nseReg)
require.NoError(t, err)

// 4. Wait until expire unregisters NSE
require.Eventually(t, func() bool {
s, err := registryClient.Find(ctx, &registry.NetworkServiceEndpointQuery{NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
}})
require.NoError(t, err)
list := registry.ReadNetworkServiceEndpointList(s)
return len(list) == 0
}, time.Second*3, time.Millisecond*500)
}

func supplyK8sRegistry(ctx context.Context, tokenGenerator token.GeneratorFunc, expireDuration time.Duration, proxyRegistryURL *url.URL, options ...grpc.DialOption) registryserver.Registry {
return registryk8s.NewServer(&registryk8s.Config{
ChainCtx: ctx,
Expand All @@ -337,3 +524,23 @@ func supplyK8sRegistry(ctx context.Context, tokenGenerator token.GeneratorFunc,
ProxyRegistryURL: proxyRegistryURL,
}, tokenGenerator, registryk8s.WithDialOptions(options...))
}

func supplyK8sRegistryWithClientSet(clientSet *fake.Clientset) func(ctx context.Context,
tokenGenerator token.GeneratorFunc,
expireDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registryserver.Registry {
return func(ctx context.Context,
tokenGenerator token.GeneratorFunc,
expireDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registryserver.Registry {
return registryk8s.NewServer(&registryk8s.Config{
ChainCtx: ctx,
Namespace: "default",
ClientSet: clientSet,
ExpirePeriod: expireDuration,
ProxyRegistryURL: proxyRegistryURL,
}, tokenGenerator, registryk8s.WithDialOptions(options...))
}
}
30 changes: 30 additions & 0 deletions pkg/registry/etcd/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcd

import "context"

type versionKey struct{}

func withNSEVersion(ctx context.Context, version string) context.Context {
return context.WithValue(ctx, versionKey{}, version)
}

func nseVersionFromContext(ctx context.Context) (string, bool) {
version, ok := ctx.Value(versionKey{}).(string)
return version, ok
}
37 changes: 25 additions & 12 deletions pkg/registry/etcd/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ type etcdNSERegistryServer struct {
}

func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
if err != nil {
return nil, err
}
meta := metav1.ObjectMeta{}
if request.Name == "" {
meta.GenerateName = "nse-"
Expand All @@ -61,16 +57,16 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.
ctx,
&v1.NetworkServiceEndpoint{
ObjectMeta: meta,
Spec: *(*v1.NetworkServiceEndpointSpec)(resp),
Spec: *(*v1.NetworkServiceEndpointSpec)(request),
},
metav1.CreateOptions{},
)
err = errors.Wrapf(err, "failed to create a pod %s in a namespace %s", resp.Name, n.ns)
err = errors.Wrapf(err, "failed to create a pod %s in a namespace %s", request.Name, n.ns)
if apierrors.IsAlreadyExists(err) {
var nse *v1.NetworkServiceEndpoint
list, erro := n.client.NetworkservicemeshV1().NetworkServiceEndpoints("").List(ctx, metav1.ListOptions{})
if erro != nil {
return nil, errors.Wrap(erro, "failed to get a list of NetworkServiceEndpoints")
list, listErr := n.client.NetworkservicemeshV1().NetworkServiceEndpoints("").List(ctx, metav1.ListOptions{})
if listErr != nil {
return nil, errors.Wrap(listErr, "failed to get a list of NetworkServiceEndpoints")
}
for i := 0; i < len(list.Items); i++ {
item := (*registry.NetworkServiceEndpoint)(&list.Items[i].Spec)
Expand All @@ -85,16 +81,22 @@ func (n *etcdNSERegistryServer) Register(ctx context.Context, request *registry.

if nse != nil {
apiResp, err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Update(ctx, nse, metav1.UpdateOptions{})
err = errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
if err != nil {
return nil, errors.Wrapf(err, "failed to update a pod %s in a namespace %s", nse.Name, n.ns)
}

n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
}
}
if err != nil {
return nil, err
}

n.versions.Store(apiResp.Spec.Name, apiResp.ResourceVersion)

return (*registry.NetworkServiceEndpoint)(&apiResp.Spec), nil
ctx = withNSEVersion(ctx, apiResp.ResourceVersion)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, request)
}

func (n *etcdNSERegistryServer) Find(query *registry.NetworkServiceEndpointQuery, s registry.NetworkServiceEndpointRegistry_FindServer) error {
Expand Down Expand Up @@ -136,6 +138,17 @@ func (n *etcdNSERegistryServer) Unregister(ctx context.Context, request *registr
return nil, errors.WithStack(err)
}

if _, ok := nseVersionFromContext(ctx); !ok {
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
ctx,
request.Name,
metav1.DeleteOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to delete a NetworkServiceEndpoints %s in a namespace %s", request.Name, n.ns)
}
return resp, nil
}
NikitaSkrynnik marked this conversation as resolved.
Show resolved Hide resolved

if v, ok := n.versions.Load(request.Name); ok {
version := v.(string)
err = n.client.NetworkservicemeshV1().NetworkServiceEndpoints(n.ns).Delete(
Expand Down
Loading