Skip to content

Commit

Permalink
Revert "Use a context with exteded timeout on Requests in begin (#1656)"
Browse files Browse the repository at this point in the history
This reverts commit 6fad31a.
  • Loading branch information
denis-tingaikin committed Sep 12, 2024
1 parent 6c3706e commit f36cc47
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 107 deletions.
21 changes: 9 additions & 12 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

Expand Down Expand Up @@ -159,16 +160,16 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
contextTimeout time.Duration
closeTimeout time.Duration
afterCloseFunc func()
server networkservice.NetworkServiceServer
}

func newEventFactoryServer(ctx context.Context, contextTimeout time.Duration, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, closeTimeout time.Duration, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
contextTimeout: contextTimeout,
closeTimeout: closeTimeout,
}
f.updateContext(ctx)

Expand Down Expand Up @@ -206,12 +207,7 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
default:
ctx, cancel := f.ctxFunc()
defer cancel()

extendedCtx, cancel := context.WithTimeout(context.Background(), f.contextTimeout)
defer cancel()

extendedCtx = extend.WithValuesFromContext(extendedCtx, ctx)
conn, err := f.server.Request(extendedCtx, f.request)
conn, err := f.server.Request(ctx, f.request)
if err == nil && f.request != nil {
f.request.Connection = conn
}
Expand Down Expand Up @@ -240,11 +236,12 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
ctx, cancel := f.ctxFunc()
defer cancel()

extendedCtx, cancel := context.WithTimeout(context.Background(), f.contextTimeout)
c := clock.FromContext(ctx)
closeCtx, cancel := c.WithTimeout(context.Background(), f.closeTimeout)
defer cancel()

extendedCtx = extend.WithValuesFromContext(extendedCtx, ctx)
_, err := f.server.Close(extendedCtx, f.request.GetConnection())
closeCtx = extend.WithValuesFromContext(closeCtx, ctx)
_, err := f.server.Close(closeCtx, f.request.GetConnection())
f.afterCloseFunc()
ch <- err
}
Expand Down
25 changes: 17 additions & 8 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"
)

// This test reproduces the situation when refresh changes the eventFactory context
Expand Down Expand Up @@ -137,12 +138,19 @@ func TestContextTimeout_Server(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

contextTimeout := time.Second * 2
// Add clockMock to the context
clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

ctx, cancel = clockMock.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
defer cancel()

closeTimeout := time.Minute
eventFactoryServ := &eventFactoryServer{}
server := chain.NewNetworkServiceServer(
begin.NewServer(begin.WithContextTimeout(contextTimeout)),
begin.NewServer(begin.WithCloseTimeout(closeTimeout)),
eventFactoryServ,
&delayedNSEServer{t: t, contextTimeout: contextTimeout},
&delayedNSEServer{t: t, closeTimeout: closeTimeout, clock: clockMock},
)

// Do Request
Expand Down Expand Up @@ -221,8 +229,9 @@ func (f *failedNSEServer) Close(ctx context.Context, conn *networkservice.Connec

type delayedNSEServer struct {
t *testing.T
clock *clockmock.Mock
initialTimeout time.Duration
contextTimeout time.Duration
closeTimeout time.Duration
}

func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand All @@ -238,20 +247,20 @@ func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.
d.initialTimeout = timeout
}
// All requests timeout must be equal the first
require.Less(d.t, (d.initialTimeout - timeout).Abs(), time.Second)
require.Equal(d.t, d.initialTimeout, timeout)

// Add delay
time.Sleep(timeout / 2)
d.clock.Add(timeout / 2)
return next.Server(ctx).Request(ctx, request)
}

func (d *delayedNSEServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
require.Greater(d.t, d.initialTimeout, time.Duration(0))

deadline, _ := ctx.Deadline()
timeout := time.Until(deadline)
clockTime := clock.FromContext(ctx)

require.Less(d.t, (d.contextTimeout - timeout).Abs(), time.Second)
require.Equal(d.t, d.closeTimeout, clockTime.Until(deadline))

return next.Server(ctx).Close(ctx, conn)
}
12 changes: 6 additions & 6 deletions pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
)

type option struct {
cancelCtx context.Context
reselect bool
contextTimeout time.Duration
cancelCtx context.Context
reselect bool
closeTimeout time.Duration
}

// Option - event option
Expand All @@ -44,9 +44,9 @@ func WithReselect() Option {
}
}

// WithContextTimeout - set a custom timeout for a context in begin.Close
func WithContextTimeout(timeout time.Duration) Option {
// WithCloseTimeout - set a custom timeout for a context in begin.Close
func WithCloseTimeout(timeout time.Duration) Option {
return func(o *option) {
o.contextTimeout = timeout
o.closeTimeout = timeout
}
}
32 changes: 13 additions & 19 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@ import (

type beginServer struct {
genericsync.Map[string, *eventFactoryServer]
contextTimeout time.Duration
closeTimeout time.Duration
}

// NewServer - creates a new begin chain element
func NewServer(opts ...Option) networkservice.NetworkServiceServer {
o := &option{
cancelCtx: context.Background(),
reselect: false,
contextTimeout: time.Minute,
cancelCtx: context.Background(),
reselect: false,
closeTimeout: time.Minute,
}

for _, opt := range opts {
opt(o)
}

return &beginServer{
contextTimeout: o.contextTimeout,
closeTimeout: o.closeTimeout,
}
}

Expand All @@ -68,7 +68,7 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryServer(
ctx,
b.contextTimeout,
b.closeTimeout,
func() {
b.Delete(request.GetRequestConnection().GetId())
},
Expand All @@ -88,24 +88,17 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer.request != nil && eventFactoryServer.request.Connection != nil {
log.FromContext(ctx).Info("Closing connection due to RESELECT_REQUESTED state")

closeCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
defer cancel()

eventFactoryCtx, eventFactoryCtxCancel := eventFactoryServer.ctxFunc()
closeCtx = extend.WithValuesFromContext(closeCtx, eventFactoryCtx)
_, closeErr := next.Server(closeCtx).Close(closeCtx, eventFactoryServer.request.Connection)
_, closeErr := next.Server(eventFactoryCtx).Close(eventFactoryCtx, eventFactoryServer.request.Connection)
if closeErr != nil {
log.FromContext(ctx).Errorf("Can't close old connection: %v", closeErr)
}
eventFactoryServer.state = closed
eventFactoryCtxCancel()
}

extendedCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
extendedCtx = extend.WithValuesFromContext(extendedCtx, withEventFactory(ctx, eventFactoryServer))
defer cancel()

conn, err = next.Server(extendedCtx).Request(extendedCtx, request)
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
conn, err = next.Server(withEventFactoryCtx).Request(withEventFactoryCtx, request)
if err != nil {
if eventFactoryServer.state != established {
eventFactoryServer.state = closed
Expand Down Expand Up @@ -150,13 +143,14 @@ func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection
if currentServerClient != eventFactoryServer {
return
}
extendedCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
extendedCtx = extend.WithValuesFromContext(extendedCtx, withEventFactory(ctx, eventFactoryServer))
closeCtx, cancel := context.WithTimeout(context.Background(), b.closeTimeout)
defer cancel()

// Always close with the last valid EventFactory we got
conn = eventFactoryServer.request.Connection
_, err = next.Server(extendedCtx).Close(extendedCtx, conn)
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
closeCtx = extend.WithValuesFromContext(closeCtx, withEventFactoryCtx)
_, err = next.Server(closeCtx).Close(closeCtx, conn)
eventFactoryServer.afterCloseFunc()
}):
return &emptypb.Empty{}, err
Expand Down
55 changes: 4 additions & 51 deletions pkg/networkservice/common/begin/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
Expand All @@ -42,24 +41,14 @@ type waitServer struct {
}

func (s *waitServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
afterCh := time.After(time.Second)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-afterCh:
s.requestDone.Add(1)
}
time.Sleep(waitTime)
s.requestDone.Store(1)
return next.Server(ctx).Request(ctx, request)
}

func (s *waitServer) Close(ctx context.Context, connection *networkservice.Connection) (*empty.Empty, error) {
afterCh := time.After(time.Second)
select {
case <-ctx.Done():
return &emptypb.Empty{}, nil
case <-afterCh:
s.closeDone.Add(1)
}
time.Sleep(waitTime)
s.closeDone.Store(1)
return next.Server(ctx).Close(ctx, connection)
}

Expand Down Expand Up @@ -93,39 +82,3 @@ func TestBeginWorksWithSmallTimeout(t *testing.T) {
return waitSrv.closeDone.Load() == 1
}, waitTime*2, time.Millisecond*500)
}

func TestBeginHasExtendedTimeoutOnReselect(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})
requestCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()

waitSrv := &waitServer{}
server := next.NewNetworkServiceServer(
begin.NewServer(),
waitSrv,
)

// Make a first request to create an event factory. Begin should make Request only
request := testRequest("id")
_, err := server.Request(requestCtx, request)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Equal(t, int32(0), waitSrv.requestDone.Load())
require.Eventually(t, func() bool {
return waitSrv.requestDone.Load() == 1
}, waitTime*2, time.Millisecond*500)

// Make a second request with RESELECT_REQUESTED. Begin should make Close with extended context first and then Request
requestCtx, cancel = context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
newRequest := request.Clone()
newRequest.Connection.State = networkservice.State_RESELECT_REQUESTED

_, err = server.Request(requestCtx, newRequest)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Equal(t, int32(0), waitSrv.closeDone.Load())
require.Eventually(t, func() bool {
return waitSrv.closeDone.Load() == 1 && waitSrv.requestDone.Load() == 2
}, waitTime*4, time.Millisecond*500)
}
8 changes: 2 additions & 6 deletions pkg/networkservice/common/dial/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -72,12 +72,8 @@ func (d *dialClient) Request(ctx context.Context, request *networkservice.Networ
return next.Client(ctx).Request(ctx, request, opts...)
}

di.mu.Lock()
dialClientURL := di.clientURL
di.mu.Unlock()

// If our existing dialer has a different URL close down the chain
if dialClientURL != nil && dialClientURL.String() != clientURL.String() {
if di.clientURL != nil && di.clientURL.String() != clientURL.String() {
closeCtx, closeCancel := closeContextFunc()
defer closeCancel()
err := di.Dial(closeCtx, di.clientURL)
Expand Down
6 changes: 1 addition & 5 deletions pkg/networkservice/common/dial/dialer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,7 +20,6 @@ import (
"context"
"net/url"
"runtime"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -38,7 +37,6 @@ type dialer struct {
*grpc.ClientConn
dialOptions []grpc.DialOption
dialTimeout time.Duration
mu sync.Mutex
}

func newDialer(ctx context.Context, dialTimeout time.Duration, dialOptions ...grpc.DialOption) *dialer {
Expand All @@ -58,10 +56,8 @@ func (di *dialer) Dial(ctx context.Context, clientURL *url.URL) error {
di.cleanupCancel()
}

di.mu.Lock()
// Set the clientURL
di.clientURL = clientURL
di.mu.Unlock()

// Setup dialTimeout if needed
dialCtx := ctx
Expand Down

0 comments on commit f36cc47

Please sign in to comment.