Skip to content

Commit

Permalink
Revert changes in begin and dial (#1682)
Browse files Browse the repository at this point in the history
* Revert "Use a context with exteded timeout on Requests in begin (#1656)"

This reverts commit 6fad31a.

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* Revert "Add a timeout for Closes in begin.Server (#1650)"

This reverts commit 3016313.

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* Revert "Add more mutexes in dial chain element to fix race conditions (#1670)"

This reverts commit b66e1bf.

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* fix linter issues

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

---------

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik authored Oct 15, 2024
1 parent 76b397f commit 083d4e2
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 251 deletions.
19 changes: 3 additions & 16 deletions pkg/networkservice/common/begin/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand Down Expand Up @@ -159,16 +158,14 @@ type eventFactoryServer struct {
ctxFunc func() (context.Context, context.CancelFunc)
request *networkservice.NetworkServiceRequest
returnedConnection *networkservice.Connection
contextTimeout time.Duration
afterCloseFunc func()
server networkservice.NetworkServiceServer
}

func newEventFactoryServer(ctx context.Context, contextTimeout time.Duration, afterClose func()) *eventFactoryServer {
func newEventFactoryServer(ctx context.Context, afterClose func()) *eventFactoryServer {
f := &eventFactoryServer{
server: next.Server(ctx),
initialCtxFunc: postpone.Context(ctx),
contextTimeout: contextTimeout,
}
f.updateContext(ctx)

Expand Down Expand Up @@ -206,12 +203,7 @@ func (f *eventFactoryServer) Request(opts ...Option) <-chan error {
default:
ctx, cancel := f.ctxFunc()
defer cancel()

extendedCtx, cancel := context.WithTimeout(context.Background(), f.contextTimeout)
defer cancel()

extendedCtx = extend.WithValuesFromContext(extendedCtx, ctx)
conn, err := f.server.Request(extendedCtx, f.request)
conn, err := f.server.Request(ctx, f.request)
if err == nil && f.request != nil {
f.request.Connection = conn
}
Expand Down Expand Up @@ -239,12 +231,7 @@ func (f *eventFactoryServer) Close(opts ...Option) <-chan error {
default:
ctx, cancel := f.ctxFunc()
defer cancel()

extendedCtx, cancel := context.WithTimeout(context.Background(), f.contextTimeout)
defer cancel()

extendedCtx = extend.WithValuesFromContext(extendedCtx, ctx)
_, err := f.server.Close(extendedCtx, f.request.GetConnection())
_, err := f.server.Close(ctx, f.request.GetConnection())
f.afterCloseFunc()
ch <- err
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/networkservice/common/begin/event_factory_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/clockmock"
)

// This test reproduces the situation when refresh changes the eventFactory context
Expand Down Expand Up @@ -137,12 +138,18 @@ func TestContextTimeout_Server(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

contextTimeout := time.Second * 2
// Add clockMock to the context
clockMock := clockmock.New(ctx)
ctx = clock.WithClock(ctx, clockMock)

ctx, cancel = context.WithDeadline(ctx, clockMock.Now().Add(time.Second*3))
defer cancel()

eventFactoryServ := &eventFactoryServer{}
server := chain.NewNetworkServiceServer(
begin.NewServer(begin.WithContextTimeout(contextTimeout)),
begin.NewServer(),
eventFactoryServ,
&delayedNSEServer{t: t, contextTimeout: contextTimeout},
&delayedNSEServer{t: t, clock: clockMock},
)

// Do Request
Expand Down Expand Up @@ -221,8 +228,8 @@ func (f *failedNSEServer) Close(ctx context.Context, conn *networkservice.Connec

type delayedNSEServer struct {
t *testing.T
clock *clockmock.Mock
initialTimeout time.Duration
contextTimeout time.Duration
}

func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand All @@ -238,20 +245,20 @@ func (d *delayedNSEServer) Request(ctx context.Context, request *networkservice.
d.initialTimeout = timeout
}
// All requests timeout must be equal the first
require.Less(d.t, (d.initialTimeout - timeout).Abs(), time.Second)
require.Equal(d.t, d.initialTimeout, timeout)

// Add delay
time.Sleep(timeout / 2)
d.clock.Add(timeout / 2)
return next.Server(ctx).Request(ctx, request)
}

func (d *delayedNSEServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
require.Greater(d.t, d.initialTimeout, time.Duration(0))

deadline, _ := ctx.Deadline()
timeout := time.Until(deadline)
clockTime := clock.FromContext(ctx)

require.Less(d.t, (d.contextTimeout - timeout).Abs(), time.Second)
require.Equal(d.t, d.initialTimeout, clockTime.Until(deadline))

return next.Server(ctx).Close(ctx, conn)
}
13 changes: 2 additions & 11 deletions pkg/networkservice/common/begin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package begin

import (
"context"
"time"
)

type option struct {
cancelCtx context.Context
reselect bool
contextTimeout time.Duration
cancelCtx context.Context
reselect bool
}

// Option - event option
Expand All @@ -43,10 +41,3 @@ func WithReselect() Option {
o.reselect = true
}
}

// WithContextTimeout - set a custom timeout for a context in begin.Close
func WithContextTimeout(timeout time.Duration) Option {
return func(o *option) {
o.contextTimeout = timeout
}
}
74 changes: 16 additions & 58 deletions pkg/networkservice/common/begin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,27 @@ package begin

import (
"context"
"time"

"github.com/edwarnicke/genericsync"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/log"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type beginServer struct {
genericsync.Map[string, *eventFactoryServer]
contextTimeout time.Duration
}

// NewServer - creates a new begin chain element
func NewServer(opts ...Option) networkservice.NetworkServiceServer {
o := &option{
cancelCtx: context.Background(),
reselect: false,
contextTimeout: time.Minute,
}

for _, opt := range opts {
opt(o)
}

return &beginServer{
contextTimeout: o.contextTimeout,
}
func NewServer() networkservice.NetworkServiceServer {
return &beginServer{}
}

func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
var conn *networkservice.Connection
var err error

func (b *beginServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (conn *networkservice.Connection, err error) {
// No connection.ID, no service
if request.GetConnection().GetId() == "" {
return nil, errors.New("request.EventFactory.Id must not be zero valued")
Expand All @@ -68,14 +50,12 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer, _ := b.LoadOrStore(request.GetConnection().GetId(),
newEventFactoryServer(
ctx,
b.contextTimeout,
func() {
b.Delete(request.GetRequestConnection().GetId())
},
),
)
select {
case <-eventFactoryServer.executor.AsyncExec(func() {
<-eventFactoryServer.executor.AsyncExec(func() {
currentEventFactoryServer, _ := b.Load(request.GetConnection().GetId())
if currentEventFactoryServer != eventFactoryServer {
log.FromContext(ctx).Debug("recalling begin.Request because currentEventFactoryServer != eventFactoryServer")
Expand All @@ -88,24 +68,17 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo
eventFactoryServer.request != nil && eventFactoryServer.request.Connection != nil {
log.FromContext(ctx).Info("Closing connection due to RESELECT_REQUESTED state")

closeCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
defer cancel()

eventFactoryCtx, eventFactoryCtxCancel := eventFactoryServer.ctxFunc()
closeCtx = extend.WithValuesFromContext(closeCtx, eventFactoryCtx)
_, closeErr := next.Server(closeCtx).Close(closeCtx, eventFactoryServer.request.Connection)
_, closeErr := next.Server(eventFactoryCtx).Close(eventFactoryCtx, eventFactoryServer.request.Connection)
if closeErr != nil {
log.FromContext(ctx).Errorf("Can't close old connection: %v", closeErr)
}
eventFactoryServer.state = closed
eventFactoryCtxCancel()
}

extendedCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
extendedCtx = extend.WithValuesFromContext(extendedCtx, withEventFactory(ctx, eventFactoryServer))
defer cancel()

conn, err = next.Server(extendedCtx).Request(extendedCtx, request)
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
conn, err = next.Server(withEventFactoryCtx).Request(withEventFactoryCtx, request)
if err != nil {
if eventFactoryServer.state != established {
eventFactoryServer.state = closed
Expand All @@ -120,48 +93,33 @@ func (b *beginServer) Request(ctx context.Context, request *networkservice.Netwo

eventFactoryServer.returnedConnection = conn.Clone()
eventFactoryServer.updateContext(ctx)
}):
case <-ctx.Done():
return nil, ctx.Err()
}

})
return conn, err
}

func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
var err error
connID := conn.GetId()
func (b *beginServer) Close(ctx context.Context, conn *networkservice.Connection) (emp *emptypb.Empty, err error) {
// If some other EventFactory is already in the ctx... we are already running in an executor, and can just execute normally
if fromContext(ctx) != nil {
return next.Server(ctx).Close(ctx, conn)
}
eventFactoryServer, ok := b.Load(connID)
eventFactoryServer, ok := b.Load(conn.GetId())
if !ok {
// If we don't have a connection to Close, just let it be
return &emptypb.Empty{}, nil
}

select {
case <-eventFactoryServer.executor.AsyncExec(func() {
<-eventFactoryServer.executor.AsyncExec(func() {
if eventFactoryServer.state != established || eventFactoryServer.request == nil {
return
}
currentServerClient, _ := b.Load(connID)
currentServerClient, _ := b.Load(conn.GetId())
if currentServerClient != eventFactoryServer {
return
}
extendedCtx, cancel := context.WithTimeout(context.Background(), b.contextTimeout)
extendedCtx = extend.WithValuesFromContext(extendedCtx, withEventFactory(ctx, eventFactoryServer))
defer cancel()

// Always close with the last valid EventFactory we got
conn = eventFactoryServer.request.Connection
_, err = next.Server(extendedCtx).Close(extendedCtx, conn)
withEventFactoryCtx := withEventFactory(ctx, eventFactoryServer)
emp, err = next.Server(withEventFactoryCtx).Close(withEventFactoryCtx, conn)
eventFactoryServer.afterCloseFunc()
}):
return &emptypb.Empty{}, err
case <-ctx.Done():
b.Delete(connID)
return nil, ctx.Err()
}
})
return &emptypb.Empty{}, err
}
Loading

0 comments on commit 083d4e2

Please sign in to comment.