Skip to content

Commit

Permalink
netns monitor (#537)
Browse files Browse the repository at this point in the history
* add netNsMonitor

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* fix linter warnings

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* fix linter warning

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* add +build linux

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* support several clients in netnt monitor

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* refactor nsmonitor

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* fix ci warnings

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* add unit-tests for nsMonitorClient

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* fix linter warning

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* sync access to test values

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* add more checks for monitoring goroutines

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>

* fix missed sync requerd in nsmonitor tests

Signed-off-by: Nikolay Chunosov <n.chunosov@yandex.ru>
  • Loading branch information
Chunosov authored Apr 1, 2022
1 parent eca3ada commit 70eceb7
Show file tree
Hide file tree
Showing 8 changed files with 707 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pkg/networkservice/chains/forwarder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/vlan"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/vxlan"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/mechanisms/wireguard"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/nsmonitor"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/pinhole"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/stats"
"github.com/networkservicemesh/sdk-vpp/pkg/networkservice/tag"
Expand Down Expand Up @@ -142,6 +143,7 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, vppConn
filtermechanisms.NewClient(),
pinhole.NewClient(vppConn),
recvfd.NewClient(),
nsmonitor.NewClient(ctx),
sendfd.NewClient()),
),
),
Expand Down
8 changes: 4 additions & 4 deletions pkg/networkservice/mechanisms/memif/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

type memifClient struct {
vppConn api.Connection
changeNetNs bool
changeNetNS bool
nsInfo NetNSInfo
}

Expand All @@ -55,7 +55,7 @@ func NewClient(vppConn api.Connection, options ...Option) networkservice.Network
return chain.NewNetworkServiceClient(
&memifClient{
vppConn: vppConn,
changeNetNs: opts.changeNetNS,
changeNetNS: opts.changeNetNS,
nsInfo: newNetNSInfo(),
},
)
Expand All @@ -64,7 +64,7 @@ func NewClient(vppConn api.Connection, options ...Option) networkservice.Network
func (m *memifClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
if !m.updateMechanismPreferences(request) {
mechanism := memif.ToMechanism(memif.NewAbstract(m.nsInfo.netNSPath))
if m.changeNetNs {
if m.changeNetNS {
mechanism.SetNetNSURL("")
}
request.MechanismPreferences = append(request.MechanismPreferences, mechanism.Mechanism)
Expand Down Expand Up @@ -111,7 +111,7 @@ func (m *memifClient) updateMechanismPreferences(request *networkservice.Network
for _, p := range request.GetRequestMechanismPreferences() {
if mechanism := memif.ToMechanism(p); mechanism != nil {
mechanism.SetNetNSURL((&url.URL{Scheme: memif.FileScheme, Path: m.nsInfo.netNSPath}).String())
if m.changeNetNs {
if m.changeNetNS {
mechanism.SetNetNSURL("")
}
updated = true
Expand Down
108 changes: 108 additions & 0 deletions pkg/networkservice/nsmonitor/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux
// +build linux

package nsmonitor

import (
"context"
"sync"

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type key struct{}

// Monitor provides interface for netns monitor
type Monitor interface {
Watch(ctx context.Context, inodeURL string) <-chan struct{}
}

type netNSMonitorClient struct {
chainCtx context.Context
monitor Monitor
onceInit sync.Once
supplyMonitor func(ctx context.Context) Monitor
}

// NewClient returns new net ns monitoring client
func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkServiceClient {
options := &clientOptions{
supplyMonitor: newMonitor,
}

for _, opt := range opts {
opt(options)
}

return &netNSMonitorClient{
chainCtx: chainCtx,
supplyMonitor: options.supplyMonitor,
}
}

func (r *netNSMonitorClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}

r.onceInit.Do(func() {
r.monitor = r.supplyMonitor(r.chainCtx)
})

cancelCtx, cancel := context.WithCancel(r.chainCtx)
if _, ok := metadata.Map(ctx, metadata.IsClient(r)).LoadOrStore(key{}, cancel); !ok {
if inodeURL, ok := conn.GetMechanism().GetParameters()[common.InodeURL]; ok {
deleteCh := r.monitor.Watch(cancelCtx, inodeURL)
factory := begin.FromContext(ctx)
go func() {
select {
case <-r.chainCtx.Done():
return
case <-cancelCtx.Done():
return
case _, ok := <-deleteCh:
if ok {
factory.Close(begin.CancelContext(cancelCtx))
}
return
}
}()
}
}

return conn, nil
}

func (r *netNSMonitorClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*empty.Empty, error) {
if v, ok := metadata.Map(ctx, metadata.IsClient(r)).LoadAndDelete(key{}); ok {
if cancel, ok := v.(context.CancelFunc); ok {
cancel()
}
}

return next.Client(ctx).Close(ctx, conn, opts...)
}
Loading

0 comments on commit 70eceb7

Please sign in to comment.