Skip to content

Commit

Permalink
Fix timeout/expire chain elements
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art committed Dec 26, 2022
1 parent beec79b commit dc57328
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 114 deletions.
18 changes: 7 additions & 11 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,20 +538,20 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil)

if withNSEExpiration {
builder = builder.SetRegistryExpiryDuration(time.Second / 2)
}

domain := builder.Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseCtx, nseCtxCancel := context.WithCancel(ctx)

domain.Nodes[0].NewEndpoint(nseCtx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateTestToken)
nseCtx, nseCtxCancel := context.WithTimeout(ctx, time.Second/2)
if withNSEExpiration {
// NSE will be unregistered after (tokenTimeout - registerTimeout)
domain.Nodes[0].NewEndpoint(nseCtx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateExpiringToken(time.Second))
} else {
domain.Nodes[0].NewEndpoint(nseCtx, defaultRegistryEndpoint(nsReg.Name), sandbox.GenerateTestToken)
}

request := defaultRequest(nsReg.Name)

Expand Down Expand Up @@ -601,10 +601,6 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {

nscCtxCancel()

for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Cancel()
}

require.Eventually(t, func() bool {
logrus.Error(goleak.Find())
return goleak.Find(ignoreCurrent) == nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
}

var nseRegistry = chain.NewNetworkServiceEndpointRegistryServer(
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
updatepath.NewNetworkServiceEndpointRegistryServer(tokenGenerator),
opts.authorizeNSERegistryServer,
registryclientinfo.NewNetworkServiceEndpointRegistryServer(),
Expand Down
210 changes: 197 additions & 13 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,32 @@ import (

"github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
registryapi "github.com/networkservicemesh/api/pkg/api/registry"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
registryapi "github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/endpoint"
"github.com/networkservicemesh/sdk/pkg/networkservice/chains/nsmgr"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/authorize"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/excludedprefixes"
"github.com/networkservicemesh/sdk/pkg/networkservice/ipam/point2pointipam"
countutils "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/registry"
registryclient "github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/registry/chains/memory"
"github.com/networkservicemesh/sdk/pkg/registry/common/authorize"
authorizeregistry "github.com/networkservicemesh/sdk/pkg/registry/common/authorize"
"github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
injecterrorregistry "github.com/networkservicemesh/sdk/pkg/registry/utils/inject/injecterror"
"github.com/networkservicemesh/sdk/pkg/tools/clientinfo"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
"github.com/networkservicemesh/sdk/pkg/tools/token"
)
Expand Down Expand Up @@ -500,13 +507,13 @@ func Test_FailedRegistryAuthorization(t *testing.T) {
nsmgrSuppier := func(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...nsmgr.Option) nsmgr.Nsmgr {
options = append(options,
nsmgr.WithAuthorizeNSERegistryServer(
authorize.NewNetworkServiceEndpointRegistryServer(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
authorizeregistry.NewNetworkServiceEndpointRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
nsmgr.WithAuthorizeNSRegistryServer(
authorize.NewNetworkServiceRegistryServer(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
authorizeregistry.NewNetworkServiceRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
nsmgr.WithAuthorizeNSERegistryClient(
authorize.NewNetworkServiceEndpointRegistryClient(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
authorizeregistry.NewNetworkServiceEndpointRegistryClient(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
nsmgr.WithAuthorizeNSRegistryClient(
authorize.NewNetworkServiceRegistryClient(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
authorizeregistry.NewNetworkServiceRegistryClient(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
)
return nsmgr.NewServer(ctx, tokenGenerator, options...)
}
Expand All @@ -526,7 +533,7 @@ func Test_FailedRegistryAuthorization(t *testing.T) {
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
authorize.NewNetworkServiceRegistryServer(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
authorizeregistry.NewNetworkServiceRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))),
)
}
domain := sandbox.NewBuilder(ctx, t).
Expand All @@ -553,17 +560,194 @@ func Test_FailedRegistryAuthorization(t *testing.T) {

nsRegistryClient1 := domain.NewNSRegistryClient(ctx, tokenGeneratorFunc("spiffe://test.com/ns-1"),
registryclient.WithAuthorizeNSRegistryClient(
authorize.NewNetworkServiceRegistryClient(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))))
authorizeregistry.NewNetworkServiceRegistryClient(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))))

ns1 := defaultRegistryService("ns-1")
_, err := nsRegistryClient1.Register(ctx, ns1)
require.NoError(t, err)

nsRegistryClient2 := domain.NewNSRegistryClient(ctx, tokenGeneratorFunc("spiffe://test.com/ns-2"),
registryclient.WithAuthorizeNSRegistryClient(
authorize.NewNetworkServiceRegistryClient(authorize.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))))
authorizeregistry.NewNetworkServiceRegistryClient(authorizeregistry.WithPolicies("etc/nsm/opa/registry/client_allowed.rego"))))

ns2 := defaultRegistryService("ns-1")
_, err = nsRegistryClient2.Register(ctx, ns2)
require.Error(t, err)
}

func createAuthorizedEndpoint(ctx context.Context, t *testing.T, ns string, nsmgrURL *url.URL, counter networkservice.NetworkServiceServer) {
nseReg := defaultRegistryEndpoint(ns)

nse := endpoint.NewServer(ctx, sandbox.GenerateTestToken,
endpoint.WithName("final-endpoint"),
endpoint.WithAuthorizeServer(authorize.NewServer(authorize.WithPolicies("etc/nsm/opa/common/tokens_expired.rego"))),
endpoint.WithAdditionalFunctionality(counter),
)

nseServer := grpc.NewServer()
nse.Register(nseServer)
nseURL := &url.URL{Scheme: "tcp", Host: "127.0.0.1:0"}
errCh := grpcutils.ListenAndServe(ctx, nseURL, nseServer)
select {
case err := <-errCh:
require.NoError(t, err)
default:
}

nseRegistryClient := registryclient.NewNetworkServiceEndpointRegistryClient(
ctx,
registryclient.WithClientURL(nsmgrURL),
registryclient.WithDialOptions(sandbox.DialOptions(sandbox.WithTokenGenerator(sandbox.GenerateTestToken))...),
registryclient.WithNSEAdditionalFunctionality(sendfd.NewNetworkServiceEndpointRegistryClient()),
)

nseReg.Url = nseURL.String()
_, err := nseRegistryClient.Register(ctx, nseReg.Clone())
require.NoError(t, err)
}

// This test checks timeout on sandbox
// We run nsmgr and NSE with networkservice authorize chain element (tokens_expired.rego)
func Test_Timeout(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

// timeout chain element will call Close() after (tokenTimeout - requestTimeout)
// to be sure that token is not expired
tokenTimeout := time.Second * 2
requestTimeout := time.Second + time.Millisecond*500

chainCtx, chainCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
defer chainCtxCancel()

// Set tokens_expired policy
nsmgrSuppier := func(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...nsmgr.Option) nsmgr.Nsmgr {
options = append(options,
nsmgr.WithAuthorizeServer(authorize.NewServer(authorize.WithPolicies("etc/nsm/opa/common/tokens_expired.rego"))),
)
return nsmgr.NewServer(ctx, tokenGenerator, options...)
}

domain := sandbox.NewBuilder(chainCtx, t).
SetNodesCount(1).
SetNSMgrSupplier(nsmgrSuppier).
Build()

nsRegistryClient := domain.NewNSRegistryClient(chainCtx, sandbox.GenerateTestToken)
ns := defaultRegistryService("ns")

nsReg, err := nsRegistryClient.Register(chainCtx, ns)
require.NoError(t, err)

counter := new(countutils.Server)

createAuthorizedEndpoint(chainCtx, t, ns.Name, domain.Nodes[0].NSMgr.URL, counter)

// Set an expiring token.
// Add injecterror to allow only the first Request. All subsequent ones will fall.
// This emulates the death of the client.
nsc := domain.Nodes[0].NewClient(chainCtx,
sandbox.GenerateExpiringToken(tokenTimeout),
client.WithAdditionalFunctionality(
injecterror.NewClient(injecterror.WithRequestErrorTimes(1, -1)),
),
)

request := defaultRequest(nsReg.Name)
requestCtx, requestCtxCancel := context.WithTimeout(context.Background(), requestTimeout)
defer requestCtxCancel()

conn, err := nsc.Request(requestCtx, request)
require.NoError(t, err)
require.NotNil(t, conn)
// Closes equal to 0 for now
require.Equal(t, 1, counter.Requests())
require.Equal(t, 0, counter.Closes())

// Waiting for the timeout
require.Eventually(t, func() bool { return counter.Closes() == 1 }, tokenTimeout, time.Millisecond*100)
}

// This test checks registry expire on sandbox
// We run nsmgr and registry with registry authorize chain element (tokens_expired.rego)
func Test_Expire(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

// expire chain element will call Unregister() after (tokenTimeout - registerTimeout)
// to be sure that token is not expired
tokenTimeout := time.Second * 2
registerTimeout := time.Second + time.Millisecond*500

chainCtx, chainCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
defer chainCtxCancel()

// Set tokens_expired policy for nsmgr and registry
nsmgrSuppier := func(ctx context.Context, tokenGenerator token.GeneratorFunc, options ...nsmgr.Option) nsmgr.Nsmgr {
options = append(options,
nsmgr.WithAuthorizeNSERegistryServer(
authorizeregistry.NewNetworkServiceEndpointRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/common/tokens_expired.rego"))),
)
return nsmgr.NewServer(ctx, tokenGenerator, options...)
}

registrySupplier := func(
ctx context.Context,
tokenGenerator token.GeneratorFunc,
expiryDuration time.Duration,
proxyRegistryURL *url.URL,
options ...grpc.DialOption) registry.Registry {
return memory.NewServer(
ctx,
tokenGenerator,
memory.WithExpireDuration(expiryDuration),
memory.WithProxyRegistryURL(proxyRegistryURL),
memory.WithDialOptions(options...),
memory.WithAuthorizeNSRegistryServer(
authorizeregistry.NewNetworkServiceRegistryServer(authorizeregistry.WithPolicies("etc/nsm/opa/common/tokens_expired.rego"))),
)
}

domain := sandbox.NewBuilder(chainCtx, t).
SetNodesCount(1).
SetNSMgrSupplier(nsmgrSuppier).
SetRegistrySupplier(registrySupplier).
Build()

nsRegistryClient := domain.NewNSRegistryClient(chainCtx, sandbox.GenerateTestToken)
ns := defaultRegistryService("ns")

nsReg, err := nsRegistryClient.Register(chainCtx, ns)
require.NoError(t, err)

// Set an expiring token.
// Add injecterrorregistry to allow only the first Register. All subsequent ones will fall.
// This emulates the death of the NSE.
nseRegistryClient := registryclient.NewNetworkServiceEndpointRegistryClient(chainCtx,
registryclient.WithClientURL(domain.Nodes[0].NSMgr.URL),
registryclient.WithDialOptions(sandbox.DialOptions(sandbox.WithTokenGenerator(sandbox.GenerateExpiringToken(tokenTimeout)))...),
registryclient.WithNSEAdditionalFunctionality(
injecterrorregistry.NewNetworkServiceEndpointRegistryClient(
injecterrorregistry.WithRegisterErrorTimes(1, -1),
injecterrorregistry.WithFindErrorTimes())),
)

registerCtx, registerCtxCancel := context.WithTimeout(context.Background(), registerTimeout)
defer registerCtxCancel()
_, err = nseRegistryClient.Register(registerCtx, &registryapi.NetworkServiceEndpoint{
Name: "final-endpoint",
Url: "nseURL",
NetworkServiceNames: []string{nsReg.Name},
})
require.NoError(t, err)

// Wait for the endpoint expiration
time.Sleep(tokenTimeout)
stream, err := nseRegistryClient.Find(chainCtx, &registryapi.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registryapi.NetworkServiceEndpoint{
Name: "final-endpoint",
},
})
require.NoError(t, err)

// Eventually expire will call Unregister
require.Len(t, registryapi.ReadNetworkServiceEndpointList(stream), 0)
}
2 changes: 1 addition & 1 deletion pkg/networkservice/chains/nsmgrproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to
)

var nseServerChain = chain.NewNetworkServiceEndpointRegistryServer(
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
updatepath.NewNetworkServiceEndpointRegistryServer(tokenGenerator),
opts.authorizeNSERegistryServer,
clienturl.NewNetworkServiceEndpointRegistryServer(proxyURL),
Expand Down
12 changes: 10 additions & 2 deletions pkg/networkservice/common/timeout/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func NewServer(ctx context.Context) networkservice.NetworkServiceServer {
}

func (s *timeoutServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
timeClock := clock.FromContext(ctx)

deadline, ok := ctx.Deadline()
requestTimeout := timeClock.Until(deadline)
if !ok {
requestTimeout = 0
}

conn, err = next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
Expand All @@ -67,8 +75,8 @@ func (s *timeoutServer) Request(ctx context.Context, request *networkservice.Net
}
store(ctx, metadata.IsClient(s), cancel)
eventFactory := begin.FromContext(ctx)
timeClock := clock.FromContext(ctx)
afterCh := timeClock.After(timeClock.Until(expirationTime))
afterCh := timeClock.After(timeClock.Until(expirationTime) - requestTimeout)

go func(cancelCtx context.Context, afterCh <-chan time.Time) {
select {
case <-cancelCtx.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/chains/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
}

nseChain := chain.NewNetworkServiceEndpointRegistryServer(
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
updatepath.NewNetworkServiceEndpointRegistryServer(tokenGenerator),
opts.authorizeNSERegistryServer,
switchcase.NewNetworkServiceEndpointRegistryServer(switchcase.NSEServerCase{
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/chains/proxydns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, dnsResol
}

nseChain := chain.NewNetworkServiceEndpointRegistryServer(
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
grpcmetadata.NewNetworkServiceEndpointRegistryServer(),
updatepath.NewNetworkServiceEndpointRegistryServer(tokenGenerator),
opts.authorizeNSERegistryServer,
dnsresolve.NewNetworkServiceEndpointRegistryServer(dnsresolve.WithResolver(dnsResolver)),
Expand Down
Loading

0 comments on commit dc57328

Please sign in to comment.