Skip to content

Commit

Permalink
Replace closectx.New() with postpone.ContextWithValues()
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov committed Aug 12, 2021
1 parent 97fc7a4 commit d17ec3a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 62 deletions.
6 changes: 4 additions & 2 deletions pkg/networkservice/common/authorize/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/closectx"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

type authorizeClient struct {
Expand All @@ -55,6 +55,8 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N
var p peer.Peer
opts = append(opts, grpc.Peer(&p))

postponeCtxFunc := postpone.ContextWithValues(ctx)

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
Expand All @@ -66,7 +68,7 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N
}

if err = a.policies.check(ctx, conn); err != nil {
closeCtx, cancelClose := closectx.New(ctx)
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

if _, closeErr := next.Client(ctx).Close(closeCtx, conn, opts...); closeErr != nil {
Expand Down
28 changes: 17 additions & 11 deletions pkg/networkservice/common/connect/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/cancelctx"
"github.com/networkservicemesh/sdk/pkg/tools/clienturlctx"
"github.com/networkservicemesh/sdk/pkg/tools/closectx"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/multiexecutor"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

// ClientFactory is used to created new clients when new connection is created.
Expand Down Expand Up @@ -89,13 +89,18 @@ func (s *connectServer) Request(ctx context.Context, request *networkservice.Net
return nil, errors.Errorf("clientURL not found for incoming connection: %+v", request.GetConnection())
}

c := s.client(ctx, request.GetConnection())
if err := c.client.ctx.Err(); err != nil {
c, err := s.client(ctx, request.GetConnection())
if err != nil {
return nil, err
}
if err = c.client.ctx.Err(); err != nil {
s.deleteClient(c, clientURL.String())
s.connInfos.Delete(request.GetConnection().GetId())
return nil, err
}

postponeCtxFunc := postpone.ContextWithValues(ctx)

conn, err := c.client.Request(ctx, request.Clone())
if err != nil {
if _, ok := s.connInfos.Load(request.GetConnection().GetId()); !ok {
Expand All @@ -117,7 +122,7 @@ func (s *connectServer) Request(ctx context.Context, request *networkservice.Net
conn, err = next.Server(ctx).Request(ctx, request)
// Close connection if next.Server Request finished with error
if err != nil && !refreshRequest {
closeCtx, cancelClose := closectx.New(ctx)
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

_, closeErr := s.Close(closeCtx, request.Connection.Clone())
Expand Down Expand Up @@ -148,24 +153,25 @@ func (s *connectServer) Close(ctx context.Context, conn *networkservice.Connecti
return &empty.Empty{}, err
}

func (s *connectServer) client(ctx context.Context, conn *networkservice.Connection) *clientInfo {
func (s *connectServer) client(ctx context.Context, conn *networkservice.Connection) (*clientInfo, error) {
logger := log.FromContext(ctx).WithField("connectServer", "client")
clientURL := clienturlctx.ClientURL(ctx)

// First check if we have already requested some clientURL with this conn.GetID().
if connInfo, ok := s.connInfos.Load(conn.GetId()); ok {
if *connInfo.clientURL == *clientURL {
return connInfo.client
return connInfo.client, nil
}

closeCtx, cancelClose := closectx.New(ctx)
defer cancelClose()

// For some reason we have changed the clientURL, so we need to close and delete the existing client.
if _, clientErr := connInfo.client.client.Close(closeCtx, conn); clientErr != nil {
if _, clientErr := connInfo.client.client.Close(ctx, conn); clientErr != nil {
logger.Warnf("failed to close client: %s", clientErr.Error())
}

if err := ctx.Err(); err != nil {
return nil, err
}

s.closeClient(connInfo.client, connInfo.clientURL.String())
}

Expand All @@ -181,7 +187,7 @@ func (s *connectServer) client(ctx context.Context, conn *networkservice.Connect
}
c.count++
})
return c
return c, nil
}

func (s *connectServer) newClient(clientURL *url.URL) *clientInfo {
Expand Down
6 changes: 4 additions & 2 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/cancelctx"
"github.com/networkservicemesh/sdk/pkg/tools/closectx"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

type connectionInfo struct {
Expand Down Expand Up @@ -104,6 +104,8 @@ func (u *healClient) Request(ctx context.Context, request *networkservice.Networ
}
}()

postponeCtxFunc := postpone.ContextWithValues(ctx)

conn, err = next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
Expand All @@ -129,7 +131,7 @@ func (u *healClient) Request(ctx context.Context, request *networkservice.Networ
err = errors.Errorf("timeout waiting for connection monitor: %s", conn.GetId())
}
if err != nil {
closeCtx, cancelClose := closectx.New(ctx)
closeCtx, cancelClose := postponeCtxFunc()
defer cancelClose()

if _, closeErr := next.Client(ctx).Close(closeCtx, conn); closeErr != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/registry/common/refresh/nse_registry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/closectx"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
"github.com/networkservicemesh/sdk/pkg/tools/serializectx"
)

Expand Down Expand Up @@ -61,6 +61,8 @@ func (c *refreshNSEClient) Register(ctx context.Context, nse *registry.NetworkSe
cancel()
}

postponeCtxFunc := postpone.ContextWithValues(ctx)

reg, err := next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...)
if err != nil {
return nil, err
Expand All @@ -72,7 +74,7 @@ func (c *refreshNSEClient) Register(ctx context.Context, nse *registry.NetworkSe

cancel, err = c.startRefresh(ctx, refreshNSE, expirationDuration)
if err != nil {
unregisterCtx, cancelUnregister := closectx.New(ctx)
unregisterCtx, cancelUnregister := postponeCtxFunc()
defer cancelUnregister()

if _, unregisterErr := next.NetworkServiceEndpointRegistryServer(ctx).Unregister(unregisterCtx, reg); unregisterErr != nil {
Expand Down
45 changes: 0 additions & 45 deletions pkg/tools/closectx/context.go

This file was deleted.

51 changes: 51 additions & 0 deletions pkg/tools/postpone/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package postpone is used to create a context with postponed deadline for some cleanup operations.
package postpone

import (
"context"

"github.com/networkservicemesh/sdk/pkg/tools/clock"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
)

// Context returns a function providing the context with the same timeout as ctx has at this moment.
func Context(ctx context.Context) func() (context.Context, context.CancelFunc) {
deadline, ok := ctx.Deadline()
if !ok {
return func() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
}
}

clockTime := clock.FromContext(ctx)
timeout := clockTime.Until(deadline)

return func() (context.Context, context.CancelFunc) {
return clockTime.WithTimeout(context.Background(), timeout)
}
}

// ContextWithValues is the same as a Context, but also provided context has the same values as ctx.
func ContextWithValues(ctx context.Context) func() (context.Context, context.CancelFunc) {
ctxFunc := Context(ctx)
return func() (context.Context, context.CancelFunc) {
postponedCtx, cancel := ctxFunc()
return extend.WithValuesFromContext(postponedCtx, ctx), cancel
}
}

0 comments on commit d17ec3a

Please sign in to comment.