From d17ec3adc993618ca7efc7dc89c06b3b99405615 Mon Sep 17 00:00:00 2001 From: Vladimir Popov Date: Wed, 28 Jul 2021 20:34:33 +0700 Subject: [PATCH] Replace closectx.New() with postpone.ContextWithValues() Signed-off-by: Vladimir Popov --- pkg/networkservice/common/authorize/client.go | 6 ++- pkg/networkservice/common/connect/server.go | 28 ++++++---- pkg/networkservice/common/heal/client.go | 6 ++- .../common/refresh/nse_registry_client.go | 6 ++- pkg/tools/closectx/context.go | 45 ---------------- pkg/tools/postpone/context.go | 51 +++++++++++++++++++ 6 files changed, 80 insertions(+), 62 deletions(-) delete mode 100644 pkg/tools/closectx/context.go create mode 100644 pkg/tools/postpone/context.go diff --git a/pkg/networkservice/common/authorize/client.go b/pkg/networkservice/common/authorize/client.go index b352b766a..4be2b0e0c 100644 --- a/pkg/networkservice/common/authorize/client.go +++ b/pkg/networkservice/common/authorize/client.go @@ -30,7 +30,7 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/closectx" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" ) type authorizeClient struct { @@ -55,6 +55,8 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N var p peer.Peer opts = append(opts, grpc.Peer(&p)) + postponeCtxFunc := postpone.ContextWithValues(ctx) + conn, err := next.Client(ctx).Request(ctx, request, opts...) if err != nil { return nil, err @@ -66,7 +68,7 @@ func (a *authorizeClient) Request(ctx context.Context, request *networkservice.N } if err = a.policies.check(ctx, conn); err != nil { - closeCtx, cancelClose := closectx.New(ctx) + closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() if _, closeErr := next.Client(ctx).Close(closeCtx, conn, opts...); closeErr != nil { diff --git a/pkg/networkservice/common/connect/server.go b/pkg/networkservice/common/connect/server.go index f9eb7c492..294023902 100644 --- a/pkg/networkservice/common/connect/server.go +++ b/pkg/networkservice/common/connect/server.go @@ -33,9 +33,9 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/cancelctx" "github.com/networkservicemesh/sdk/pkg/tools/clienturlctx" - "github.com/networkservicemesh/sdk/pkg/tools/closectx" "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/multiexecutor" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" ) // ClientFactory is used to created new clients when new connection is created. @@ -89,13 +89,18 @@ func (s *connectServer) Request(ctx context.Context, request *networkservice.Net return nil, errors.Errorf("clientURL not found for incoming connection: %+v", request.GetConnection()) } - c := s.client(ctx, request.GetConnection()) - if err := c.client.ctx.Err(); err != nil { + c, err := s.client(ctx, request.GetConnection()) + if err != nil { + return nil, err + } + if err = c.client.ctx.Err(); err != nil { s.deleteClient(c, clientURL.String()) s.connInfos.Delete(request.GetConnection().GetId()) return nil, err } + postponeCtxFunc := postpone.ContextWithValues(ctx) + conn, err := c.client.Request(ctx, request.Clone()) if err != nil { if _, ok := s.connInfos.Load(request.GetConnection().GetId()); !ok { @@ -117,7 +122,7 @@ func (s *connectServer) Request(ctx context.Context, request *networkservice.Net conn, err = next.Server(ctx).Request(ctx, request) // Close connection if next.Server Request finished with error if err != nil && !refreshRequest { - closeCtx, cancelClose := closectx.New(ctx) + closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() _, closeErr := s.Close(closeCtx, request.Connection.Clone()) @@ -148,24 +153,25 @@ func (s *connectServer) Close(ctx context.Context, conn *networkservice.Connecti return &empty.Empty{}, err } -func (s *connectServer) client(ctx context.Context, conn *networkservice.Connection) *clientInfo { +func (s *connectServer) client(ctx context.Context, conn *networkservice.Connection) (*clientInfo, error) { logger := log.FromContext(ctx).WithField("connectServer", "client") clientURL := clienturlctx.ClientURL(ctx) // First check if we have already requested some clientURL with this conn.GetID(). if connInfo, ok := s.connInfos.Load(conn.GetId()); ok { if *connInfo.clientURL == *clientURL { - return connInfo.client + return connInfo.client, nil } - closeCtx, cancelClose := closectx.New(ctx) - defer cancelClose() - // For some reason we have changed the clientURL, so we need to close and delete the existing client. - if _, clientErr := connInfo.client.client.Close(closeCtx, conn); clientErr != nil { + if _, clientErr := connInfo.client.client.Close(ctx, conn); clientErr != nil { logger.Warnf("failed to close client: %s", clientErr.Error()) } + if err := ctx.Err(); err != nil { + return nil, err + } + s.closeClient(connInfo.client, connInfo.clientURL.String()) } @@ -181,7 +187,7 @@ func (s *connectServer) client(ctx context.Context, conn *networkservice.Connect } c.count++ }) - return c + return c, nil } func (s *connectServer) newClient(clientURL *url.URL) *clientInfo { diff --git a/pkg/networkservice/common/heal/client.go b/pkg/networkservice/common/heal/client.go index b7346c15a..41b24835c 100644 --- a/pkg/networkservice/common/heal/client.go +++ b/pkg/networkservice/common/heal/client.go @@ -28,7 +28,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/tools/cancelctx" - "github.com/networkservicemesh/sdk/pkg/tools/closectx" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" ) type connectionInfo struct { @@ -104,6 +104,8 @@ func (u *healClient) Request(ctx context.Context, request *networkservice.Networ } }() + postponeCtxFunc := postpone.ContextWithValues(ctx) + conn, err = next.Client(ctx).Request(ctx, request, opts...) if err != nil { return nil, err @@ -129,7 +131,7 @@ func (u *healClient) Request(ctx context.Context, request *networkservice.Networ err = errors.Errorf("timeout waiting for connection monitor: %s", conn.GetId()) } if err != nil { - closeCtx, cancelClose := closectx.New(ctx) + closeCtx, cancelClose := postponeCtxFunc() defer cancelClose() if _, closeErr := next.Client(ctx).Close(closeCtx, conn); closeErr != nil { diff --git a/pkg/registry/common/refresh/nse_registry_client.go b/pkg/registry/common/refresh/nse_registry_client.go index bdc7b561b..86b875d91 100644 --- a/pkg/registry/common/refresh/nse_registry_client.go +++ b/pkg/registry/common/refresh/nse_registry_client.go @@ -29,8 +29,8 @@ import ( "github.com/networkservicemesh/sdk/pkg/registry/core/next" "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/closectx" "github.com/networkservicemesh/sdk/pkg/tools/log" + "github.com/networkservicemesh/sdk/pkg/tools/postpone" "github.com/networkservicemesh/sdk/pkg/tools/serializectx" ) @@ -61,6 +61,8 @@ func (c *refreshNSEClient) Register(ctx context.Context, nse *registry.NetworkSe cancel() } + postponeCtxFunc := postpone.ContextWithValues(ctx) + reg, err := next.NetworkServiceEndpointRegistryClient(ctx).Register(ctx, nse, opts...) if err != nil { return nil, err @@ -72,7 +74,7 @@ func (c *refreshNSEClient) Register(ctx context.Context, nse *registry.NetworkSe cancel, err = c.startRefresh(ctx, refreshNSE, expirationDuration) if err != nil { - unregisterCtx, cancelUnregister := closectx.New(ctx) + unregisterCtx, cancelUnregister := postponeCtxFunc() defer cancelUnregister() if _, unregisterErr := next.NetworkServiceEndpointRegistryServer(ctx).Unregister(unregisterCtx, reg); unregisterErr != nil { diff --git a/pkg/tools/closectx/context.go b/pkg/tools/closectx/context.go deleted file mode 100644 index f5aa8d208..000000000 --- a/pkg/tools/closectx/context.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright (c) 2021 Doc.ai and/or its affiliates. -// -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package closectx provides helper method to create Close/Unregister context in case of Request/Register failure -package closectx - -import ( - "context" - "time" - - "github.com/networkservicemesh/sdk/pkg/tools/clock" - "github.com/networkservicemesh/sdk/pkg/tools/extend" -) - -const closeTimeout = 15 * time.Second - -// New creates a new context for Close/Unregister in case of Request/Register failure -func New(ctx context.Context) (context.Context, context.CancelFunc) { - clockTime := clock.FromContext(ctx) - - var timeout time.Duration - if deadline, ok := ctx.Deadline(); ok { - timeout = clockTime.Until(deadline) - } - if timeout < closeTimeout { - timeout = closeTimeout - } - - ctx = extend.WithValuesFromContext(context.Background(), ctx) - - return clockTime.WithTimeout(ctx, timeout) -} diff --git a/pkg/tools/postpone/context.go b/pkg/tools/postpone/context.go new file mode 100644 index 000000000..79775aa7e --- /dev/null +++ b/pkg/tools/postpone/context.go @@ -0,0 +1,51 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package postpone is used to create a context with postponed deadline for some cleanup operations. +package postpone + +import ( + "context" + + "github.com/networkservicemesh/sdk/pkg/tools/clock" + "github.com/networkservicemesh/sdk/pkg/tools/extend" +) + +// Context returns a function providing the context with the same timeout as ctx has at this moment. +func Context(ctx context.Context) func() (context.Context, context.CancelFunc) { + deadline, ok := ctx.Deadline() + if !ok { + return func() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) + } + } + + clockTime := clock.FromContext(ctx) + timeout := clockTime.Until(deadline) + + return func() (context.Context, context.CancelFunc) { + return clockTime.WithTimeout(context.Background(), timeout) + } +} + +// ContextWithValues is the same as a Context, but also provided context has the same values as ctx. +func ContextWithValues(ctx context.Context) func() (context.Context, context.CancelFunc) { + ctxFunc := Context(ctx) + return func() (context.Context, context.CancelFunc) { + postponedCtx, cancel := ctxFunc() + return extend.WithValuesFromContext(postponedCtx, ctx), cancel + } +}