Skip to content

Commit

Permalink
Remove default expiration for registry
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art committed Dec 28, 2022
1 parent e94ae98 commit 3d91c09
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
opts.authorizeNSERegistryServer,
begin.NewNetworkServiceEndpointRegistryServer(),
registryclientinfo.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, time.Minute),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
registryrecvfd.NewNetworkServiceEndpointRegistryServer(), // Allow to receive a passed files
registrysendfd.NewNetworkServiceEndpointRegistryServer(),
remoteOrLocalRegistry,
Expand Down
6 changes: 0 additions & 6 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func Test_UsecasePoint2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(time.Second).
Build()

domain.Nodes[0].NewForwarder(ctx, &registryapi.NetworkServiceEndpoint{
Expand Down Expand Up @@ -386,7 +385,6 @@ func Test_RemoteUsecase_Point2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(time.Second).
Build()

for i := 0; i < nodeCount; i++ {
Expand Down Expand Up @@ -521,15 +519,13 @@ func Test_FailedRegistryAuthorization(t *testing.T) {
registrySupplier := func(
ctx context.Context,
tokenGenerator token.GeneratorFunc,
expiryDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registry.Registry {
registryName := sandbox.UniqueName("registry-memory")

return memory.NewServer(
ctx,
tokenGeneratorFunc("spiffe://test.com/"+registryName),
memory.WithExpireDuration(expiryDuration),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
Expand Down Expand Up @@ -692,13 +688,11 @@ func Test_Expire(t *testing.T) {
registrySupplier := func(
ctx context.Context,
tokenGenerator token.GeneratorFunc,
expiryDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registry.Registry {
return memory.NewServer(
ctx,
tokenGenerator,
memory.WithExpireDuration(expiryDuration),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
Expand Down
12 changes: 1 addition & 11 deletions pkg/registry/chains/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package memory
import (
"context"
"net/url"
"time"

"google.golang.org/grpc"

Expand Down Expand Up @@ -51,7 +50,6 @@ type serverOptions struct {
authorizeNSERegistryServer registry.NetworkServiceEndpointRegistryServer
authorizeNSRegistryClient registry.NetworkServiceRegistryClient
authorizeNSERegistryClient registry.NetworkServiceEndpointRegistryClient
expireDuration time.Duration
proxyRegistryURL *url.URL
dialOptions []grpc.DialOption
}
Expand Down Expand Up @@ -99,13 +97,6 @@ func WithAuthorizeNSERegistryClient(authorizeNSERegistryClient registry.NetworkS
}
}

// WithExpireDuration sets expire duration for the server
func WithExpireDuration(expireDuration time.Duration) Option {
return func(o *serverOptions) {
o.expireDuration = expireDuration
}
}

// WithProxyRegistryURL sets URL to reach the proxy registry
func WithProxyRegistryURL(proxyRegistryURL *url.URL) Option {
return func(o *serverOptions) {
Expand All @@ -127,7 +118,6 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
authorizeNSERegistryServer: registryauthorize.NewNetworkServiceEndpointRegistryServer(registryauthorize.Any()),
authorizeNSRegistryClient: registryauthorize.NewNetworkServiceRegistryClient(registryauthorize.Any()),
authorizeNSERegistryClient: registryauthorize.NewNetworkServiceEndpointRegistryClient(registryauthorize.Any()),
expireDuration: time.Minute,
proxyRegistryURL: nil,
}
for _, opt := range options {
Expand Down Expand Up @@ -171,7 +161,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
Condition: func(c context.Context, nse *registry.NetworkServiceEndpoint) bool { return true },
Action: chain.NewNetworkServiceEndpointRegistryServer(
setregistrationtime.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, opts.expireDuration),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
memory.NewNetworkServiceEndpointRegistryServer(),
),
},
Expand Down
17 changes: 3 additions & 14 deletions pkg/registry/common/expire/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ package expire

import (
"context"
"time"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
Expand All @@ -32,17 +29,15 @@ import (
)

type expireNSEServer struct {
defaultNseExpiration time.Duration
ctx context.Context
ctx context.Context
cancelsMap
}

// NewNetworkServiceEndpointRegistryServer creates a new NetworkServiceServer chain element that implements unregister
// of expired connections for the subsequent chain elements.
func NewNetworkServiceEndpointRegistryServer(ctx context.Context, defaultNseExpiration time.Duration) registry.NetworkServiceEndpointRegistryServer {
func NewNetworkServiceEndpointRegistryServer(ctx context.Context) registry.NetworkServiceEndpointRegistryServer {
return &expireNSEServer{
defaultNseExpiration: defaultNseExpiration,
ctx: ctx,
ctx: ctx,
}
}

Expand All @@ -59,12 +54,6 @@ func (s *expireNSEServer) Register(ctx context.Context, nse *registry.NetworkSer
}

expirationTime := nse.GetExpirationTime().AsTime()
if nse.GetExpirationTime() == nil {
expirationTime = timeClock.Now().Add(s.defaultNseExpiration).Local()
nse.ExpirationTime = timestamppb.New(expirationTime)
logger.Infof("selected expiration time %v for %v", expirationTime, nse.GetName())
}

resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
if err != nil {
return nil, err
Expand Down
56 changes: 33 additions & 23 deletions pkg/registry/common/expire/nse_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func find(ctx context.Context, c registry.NetworkServiceEndpointRegistryClient)
return nses, nil
}

func generateTestToken(ctx context.Context, duration time.Duration) token.GeneratorFunc {
return func(_ credentials.AuthInfo) (string, time.Time, error) {
expireTime := clock.FromContext(ctx).Now().Add(duration).Local()

claims := jwt.RegisteredClaims{
Subject: "spiffe://test.com/subject",
ExpiresAt: jwt.NewNumericDate(expireTime),
}

tok, err := jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString([]byte("supersecret"))
return tok, expireTime, err
}
}

func TestExpireNSEServer_ShouldCorrectlySetExpirationTime_InRemoteCase(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand All @@ -85,8 +99,10 @@ func TestExpireNSEServer_ShouldCorrectlySetExpirationTime_InRemoteCase(t *testin
ctx = clock.WithClock(ctx, clockMock)

s := next.NewNetworkServiceEndpointRegistryServer(
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
new(remoteNSEServer),
)

Expand All @@ -110,8 +126,10 @@ func TestExpireNSEServer_ShouldUseLessExpirationTimeFromInput_AndWork(t *testing
mem := memory.NewNetworkServiceEndpointRegistryServer()

s := next.NewNetworkServiceEndpointRegistryServer(
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
mem,
)

Expand All @@ -130,20 +148,6 @@ func TestExpireNSEServer_ShouldUseLessExpirationTimeFromInput_AndWork(t *testing
}, testWait, testTick)
}

func generateTestToken(ctx context.Context, duration time.Duration) token.GeneratorFunc {
return func(_ credentials.AuthInfo) (string, time.Time, error) {
expireTime := clock.FromContext(ctx).Now().Add(duration).Local()

claims := jwt.RegisteredClaims{
Subject: "spiffe://test.com/subject",
ExpiresAt: jwt.NewNumericDate(expireTime),
}

tok, err := jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString([]byte("supersecret"))
return tok, expireTime, err
}
}

func TestExpireNSEServer_ShouldUseLessExpirationTimeFromResponse(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

Expand All @@ -157,7 +161,7 @@ func TestExpireNSEServer_ShouldUseLessExpirationTimeFromResponse(t *testing.T) {
begin.NewNetworkServiceEndpointRegistryServer(),
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
new(remoteNSEServer), // <-- GRPC invocation
begin.NewNetworkServiceEndpointRegistryServer(),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout/2)),
Expand All @@ -184,7 +188,7 @@ func TestExpireNSEServer_ShouldRemoveNSEAfterExpirationTime(t *testing.T) {
begin.NewNetworkServiceEndpointRegistryServer(),
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout*2),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
new(remoteNSEServer), // <-- GRPC invocation
mem,
)
Expand Down Expand Up @@ -218,7 +222,7 @@ func TestExpireNSEServer_DataRace(t *testing.T) {

s := next.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, 0),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
localbypass.NewNetworkServiceEndpointRegistryServer("tcp://0.0.0.0"),
mem,
)
Expand Down Expand Up @@ -251,8 +255,10 @@ func TestExpireNSEServer_RefreshFailure(t *testing.T) {
refresh.NewNetworkServiceEndpointRegistryClient(ctx),
adapters.NetworkServiceEndpointServerToClient(next.NewNetworkServiceEndpointRegistryServer(
new(remoteNSEServer), // <-- GRPC invocation
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
injecterror.NewNetworkServiceEndpointRegistryServer(
injecterror.WithRegisterErrorTimes(1, -1),
injecterror.WithFindErrorTimes(),
Expand Down Expand Up @@ -284,14 +290,16 @@ func TestExpireNSEServer_UnregisterFailure(t *testing.T) {
mem := memory.NewNetworkServiceEndpointRegistryServer()

s := next.NewNetworkServiceEndpointRegistryServer(
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
injecterror.NewNetworkServiceEndpointRegistryServer(
injecterror.WithRegisterErrorTimes(),
injecterror.WithFindErrorTimes(),
injecterror.WithUnregisterErrorTimes(0),
),
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
mem,
)

Expand Down Expand Up @@ -335,7 +343,9 @@ func TestExpireNSEServer_RefreshKeepsNoUnregister(t *testing.T) {
next.NewNetworkServiceEndpointRegistryServer(
// NSMgr chain
new(remoteNSEServer), // <-- GRPC invocation
expire.NewNetworkServiceEndpointRegistryServer(ctx, expireTimeout),
injectpeertoken.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
updatepath.NewNetworkServiceEndpointRegistryServer(generateTestToken(ctx, expireTimeout)),
expire.NewNetworkServiceEndpointRegistryServer(ctx),
unregisterServer,
)),
)
Expand Down
14 changes: 7 additions & 7 deletions pkg/registry/common/updatepath/nse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (s *updatePathNSEServer) Register(ctx context.Context, nse *registry.Networ
path := grpcmetadata.PathFromContext(ctx)

// Update path
peerTok, peerExpirationTime, tokenErr := token.FromContext(ctx)
if tokenErr != nil {
log.FromContext(ctx).Warnf("an error during getting peer token from the context: %+v", tokenErr)
peerTok, peerExpirationTime, peerTokenErr := token.FromContext(ctx)
if peerTokenErr != nil {
log.FromContext(ctx).Warnf("an error during getting peer token from the context: %+v", peerTokenErr)
}
tok, expirationTime, tokenErr := generateToken(ctx, s.tokenGenerator)
if tokenErr != nil {
Expand All @@ -71,12 +71,12 @@ func (s *updatePathNSEServer) Register(ctx context.Context, nse *registry.Networ
nse.PathIds = updatePathIds(nse.PathIds, int(path.Index-1), peerID.String())
nse.PathIds = updatePathIds(nse.PathIds, int(path.Index), id.String())

if nse.GetExpirationTime() == nil || peerExpirationTime.Before(nse.GetExpirationTime().AsTime().Local()) {
nse.ExpirationTime = timestamppb.New(peerExpirationTime)
}
if expirationTime.Before(nse.GetExpirationTime().AsTime().Local()) {
if nse.GetExpirationTime() == nil || expirationTime.Before(nse.GetExpirationTime().AsTime().Local()) {
nse.ExpirationTime = timestamppb.New(expirationTime)
}
if peerTokenErr == nil && peerExpirationTime.Before(nse.GetExpirationTime().AsTime().Local()) {
nse.ExpirationTime = timestamppb.New(peerExpirationTime)
}

nse, err = next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
if err != nil {
Expand Down
39 changes: 14 additions & 25 deletions pkg/tools/sandbox/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -53,39 +52,36 @@ type Builder struct {
supplyRegistryProxy SupplyRegistryProxyFunc
setupNode SetupNodeFunc

name string
dnsResolver dnsresolve.Resolver
generateTokenFunc token.GeneratorFunc
registryExpiryDuration time.Duration
name string
dnsResolver dnsresolve.Resolver
generateTokenFunc token.GeneratorFunc

useUnixSockets bool

domain *Domain
}

func newRegistryMemoryServer(ctx context.Context, tokenGenerator token.GeneratorFunc, expiryDuration time.Duration, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry {
func newRegistryMemoryServer(ctx context.Context, tokenGenerator token.GeneratorFunc, proxyRegistryURL *url.URL, options ...grpc.DialOption) registry.Registry {
return memory.NewServer(
ctx,
tokenGenerator,
memory.WithExpireDuration(expiryDuration),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...))
}

// NewBuilder creates new SandboxBuilder
func NewBuilder(ctx context.Context, t *testing.T) *Builder {
b := &Builder{
t: t,
ctx: ctx,
nodesCount: 1,
supplyNSMgr: nsmgr.NewServer,
supplyNSMgrProxy: nsmgrproxy.NewServer,
supplyRegistry: newRegistryMemoryServer,
supplyRegistryProxy: proxydns.NewServer,
name: "cluster.local",
dnsResolver: NewFakeResolver(),
generateTokenFunc: GenerateTestToken,
registryExpiryDuration: time.Minute,
t: t,
ctx: ctx,
nodesCount: 1,
supplyNSMgr: nsmgr.NewServer,
supplyNSMgrProxy: nsmgrproxy.NewServer,
supplyRegistry: newRegistryMemoryServer,
supplyRegistryProxy: proxydns.NewServer,
name: "cluster.local",
dnsResolver: NewFakeResolver(),
generateTokenFunc: GenerateTestToken,
}

b.setupNode = func(ctx context.Context, node *Node, _ int) {
Expand Down Expand Up @@ -151,12 +147,6 @@ func (b *Builder) SetTokenGenerateFunc(f token.GeneratorFunc) *Builder {
return b
}

// SetRegistryExpiryDuration replaces registry expiry duration to custom
func (b *Builder) SetRegistryExpiryDuration(registryExpiryDuration time.Duration) *Builder {
b.registryExpiryDuration = registryExpiryDuration
return b
}

// UseUnixSockets sets 1 node and mark it to use unix socket to listen on.
func (b *Builder) UseUnixSockets() *Builder {
require.NotEqual(b.t, "windows", runtime.GOOS, "Unix sockets are not available for windows")
Expand Down Expand Up @@ -275,7 +265,6 @@ func (b *Builder) newRegistry() *RegistryEntry {
entry.Registry = b.supplyRegistry(
ctx,
b.generateTokenFunc,
b.registryExpiryDuration,
nsmgrProxyURL,
DialOptions(WithTokenGenerator(b.generateTokenFunc))...,
)
Expand Down
Loading

0 comments on commit 3d91c09

Please sign in to comment.