Skip to content

Commit

Permalink
Add cleanup chain element
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art committed Mar 16, 2022
1 parent 985d4a0 commit cbbb4a0
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 0 deletions.
120 changes: 120 additions & 0 deletions pkg/networkservice/common/cleanup/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cleanup_test

import (
"context"
"fmt"
"testing"
"time"

"go.uber.org/goleak"

"github.com/stretchr/testify/require"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/cleanup"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

func TestCleanUp_CtxDone(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
chainCtx, cancel := context.WithCancel(context.Background())
defer cancel()

counter := new(count.Client)

client := chain.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
cleanup.NewClient(chainCtx),
counter,
)
req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: "nsc-1"},
}
_, err := client.Request(context.Background(), req)
require.NoError(t, err)
require.Equal(t, 1, counter.Requests())
require.Equal(t, 0, counter.Closes())
cancel()

require.Eventually(t, func() bool {
return counter.Closes() == 1
}, time.Millisecond*100, time.Millisecond*10)
}

func TestCleanUp_Close(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
chainCtx, cancel := context.WithCancel(context.Background())
defer cancel()

counter := new(count.Client)

client := chain.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
cleanup.NewClient(chainCtx),
counter,
)
req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: "nsc-1"},
}
conn, err := client.Request(context.Background(), req)
require.NoError(t, err)

_, _ = client.Close(context.Background(), conn)
require.Equal(t, 1, counter.Closes())
cancel()
require.Never(t, func() bool {
return counter.Closes() > 1
}, time.Millisecond*100, time.Millisecond*10)
}

func TestCleanUp_Chan(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })
chainCtx, cancel := context.WithCancel(context.Background())
defer cancel()

counter := new(count.Client)

doneCh := make(chan struct{})
client := chain.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
cleanup.NewClient(chainCtx, cleanup.WithDoneChan(doneCh)),
counter,
)

requestsNumber := 500
for i := 0; i < requestsNumber; i++ {
req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: fmt.Sprintf("nsc-%v", i)},
}
_, err := client.Request(context.Background(), req)
require.NoError(t, err)
}

cancel()
<-doneCh

require.Equal(t, counter.Closes(), requestsNumber)
}
124 changes: 124 additions & 0 deletions pkg/networkservice/common/cleanup/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cleanup

import (
"context"
"sync/atomic"

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
)

type cleanupClient struct {
chainCtx context.Context

ccClose bool
doneCh chan struct{}
activeConns int32
executor serialize.Executor
}

// NewClient - returns a cleanup client chain element
func NewClient(ctx context.Context, opts ...Option) networkservice.NetworkServiceClient {
o := &options{}
for _, opt := range opts {
opt(o)
}
c := &cleanupClient{
chainCtx: ctx,
ccClose: o.ccClose,
doneCh: o.doneCh,
}
go func() {
<-c.chainCtx.Done()
if atomic.LoadInt32(&c.activeConns) == 0 && c.doneCh != nil {
c.executor.AsyncExec(func() {
select {
case <-c.doneCh:
default:
close(c.doneCh)
}
})
}
}()
return c
}

func (c *cleanupClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}
// Update active connections counter. Needed for a cleanup done notification.
atomic.AddInt32(&c.activeConns, 1)
if cancel, ok := loadAndDeleteCancel(ctx); ok {
cancel()
}

cancelCtx, cancel := context.WithCancel(context.Background())
storeCancel(ctx, cancel)

factory := begin.FromContext(ctx)
go func() {
select {
case <-c.chainCtx.Done():
// Add to metadata if we want to delete clientconn
if c.ccClose {
storeCC(ctx)
}

<-factory.Close(begin.CancelContext(cancelCtx))
atomic.AddInt32(&c.activeConns, -1)

if atomic.LoadInt32(&c.activeConns) == 0 && c.doneCh != nil {
c.executor.AsyncExec(func() {
select {
case <-c.doneCh:
default:
close(c.doneCh)
}
})
}
case <-cancelCtx.Done():
atomic.AddInt32(&c.activeConns, -1)
}
}()
return conn, err
}

func (c *cleanupClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
if cancel, ok := loadAndDeleteCancel(ctx); ok {
if _, ok := loadAndDeleteCC(ctx); ok {
if cc, ok := clientconn.Load(ctx); ok {
if closable, ok := cc.(interface{ Close() error }); ok {
_ = closable.Close()
}
clientconn.Delete(ctx)
}
}
cancel()
}
return next.Client(ctx).Close(ctx, conn, opts...)
}
18 changes: 18 additions & 0 deletions pkg/networkservice/common/cleanup/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cleanup provides networkservice.NetworkService chain elements to clean up resources before termination
package cleanup
58 changes: 58 additions & 0 deletions pkg/networkservice/common/cleanup/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cleanup

import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type keyCancel struct{}
type keyCC struct{}

// storeCancel sets the context.CancelFunc stored in per Connection.Id metadata.
func storeCancel(ctx context.Context, cancel context.CancelFunc) {
metadata.Map(ctx, true).Store(keyCancel{}, cancel)
}

// loadAndDeleteCancel deletes the context.CancelFunc stored in per Connection.Id metadata,
// returning the previous value if any. The loaded result reports whether the key was present.
func loadAndDeleteCancel(ctx context.Context) (value context.CancelFunc, ok bool) {
rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(keyCancel{})
if !ok {
return
}
value, ok = rawValue.(context.CancelFunc)
return value, ok
}

// storeCC sets the flag to delete clientconn in per Connection.Id metadata.
func storeCC(ctx context.Context) {
metadata.Map(ctx, true).Store(keyCC{}, struct{}{})
}

// loadAndDeleteCC deletes the flag stored in per Connection.Id metadata,
// returning the previous value if any. The loaded result reports whether the key was present.
func loadAndDeleteCC(ctx context.Context) (value struct{}, ok bool) {
rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(keyCC{})
if !ok {
return
}
value, ok = rawValue.(struct{})
return value, ok
}
39 changes: 39 additions & 0 deletions pkg/networkservice/common/cleanup/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cleanup

type options struct {
ccClose bool
doneCh chan struct{}
}

// Option - options for the cleanup chain element
type Option func(*options)

// WithCCClose - closes clientconn to prevent calling requests/closes on other datapath objects
func WithCCClose() Option {
return func(o *options) {
o.ccClose = true
}
}

// WithDoneChan - receives a channel to notify the end of cleaning
func WithDoneChan(doneCh chan struct{}) Option {
return func(o *options) {
o.doneCh = doneCh
}
}

0 comments on commit cbbb4a0

Please sign in to comment.