Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed recvfd leaks #1173

Merged
merged 12 commits into from
Dec 7, 2021
6 changes: 1 addition & 5 deletions pkg/networkservice/chains/nsmgr/unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//+build !windows
// +build linux
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the switch to only linux? This precludes testing on the Mac OS... what was the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Ed!

This was a code change suggested during review and I somehow missed that it would remove tests for MacOS.
I'm going to revert this back to !windows.

@denis-tingaikin , please, share your thoughts in case you have any objections.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwarnicke This tag was incorrect. All tests actually check OS name on linux. So I suggested just changing the build tag instead of using skip on non-linux os names in tests...

Copy link
Contributor Author

@sol-0 sol-0 Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwarnicke @denis-tingaikin , I'll check the tests with !windows on MacOS (with removed OS name check)

Copy link
Member

@denis-tingaikin denis-tingaikin Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sol-0 Please do not use !windows with removed OS name check. Note: these tests are testing grpcfd library. The library required linux...

FYI: Tests will work, but for MacOS they will not make sense because we are using null server/clients instead of server/clients based on grpcfd for non-linux systems...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@denis-tingaikin , ok, thanks, Denis! I'll revert the change.

Copy link
Member

@denis-tingaikin denis-tingaikin Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See at an example of implementation grpcfd based chain element for non-linux systems

return null.NewNetworkServiceEndpointRegistryClient()


package nsmgr_test

import (
"context"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -84,9 +83,6 @@ func Test_Local_NoURLUsecase(t *testing.T) {
}

func Test_MultiForwarderSendfd(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("sendfd works only on linux")
}
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
5 changes: 2 additions & 3 deletions pkg/networkservice/common/mechanisms/recvfd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func recvFDAndSwapInodeToFile(ctx context.Context, fileMap *perConnectionFileMap
return err
}

func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string, closeAllFiles bool) error {
func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this in integration? I ask... because I would expect this to cause issues. The purpose of the recvfd was to receive the file and hold it open for the life of the connection. This is because other things like for example memifproxy may need to be using it during the life of the connection.

Now please note, the and hold it open part may in fact not be necessary... but you will not be able to tell that from the unit testing in SDK. Testing in integration or with cmd-forwarder-vpp would give you a sense of whether you can get away with closing these files after the processing of a Request message or whether you need to hold them open for the life of the connection.

I'm hoping your are correct here... because I'd like to see this change work as submitted :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwarnicke , thanks a lot for your comments!

As far as I understand, if there's a timeout, all files are also closed as closeAllFiles flag is true in Close method. The cleanup I added is for the case when there's no Peer in context (cancelContext passed from timeoutserver).

I'm definitely not going to commit this unless it's tested in integration (locally as well as on CI - will discuss the details with Denis).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sol-0 Yeah... and please note: your change may be right. I hope its right. I was just wracking my brains as to why the closeAll had been there to begin with, and thought of possible (not certain) issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwarnicke You're correct. Currently, @sol-0 is verifying this change on integration tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@denis-tingaikin , @edwarnicke integration testing has been completed: https://github.com/networkservicemesh/integration-k8s-kind/actions/runs/1517061295

Local integration testing has also completed successfully.

// Get the inodeURL from parameters
fileURLStr, ok := parameters[common.InodeURL]
if !ok {
Expand All @@ -112,11 +112,10 @@ func swapFileToInode(fileMap *perConnectionFileMap, parameters map[string]string
// Swap the fileURL for the inodeURL in parameters
parameters[common.InodeURL] = inodeURL.String()

// If closeAllFiles == true, close any files we may have open for any other inodes
// This is used to clean up files sent by MechanismPreferences that were *not* selected to be the
// connection mechanism
for inodeURLStr, file := range fileMap.filesByInodeURL {
if closeAllFiles || inodeURLStr != inodeURL.String() {
if inodeURLStr != inodeURL.String() {
delete(fileMap.filesByInodeURL, inodeURLStr)
_ = file.Close()
}
Expand Down
24 changes: 20 additions & 4 deletions pkg/networkservice/common/mechanisms/recvfd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *recvFDServer) Request(ctx context.Context, request *networkservice.Netw
}

// Swap back from File to Inode in the InodeURL in the Parameters
err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters(), false)
err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters())
if err != nil {
return nil, err
}
Expand All @@ -78,18 +78,20 @@ func (r *recvFDServer) Request(ctx context.Context, request *networkservice.Netw
}

func (r *recvFDServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
// Clean up the fileMap no matter what happens
defer r.closeFiles(conn)

// Get the grpcfd.FDRecver
recv, ok := grpcfd.FromContext(ctx)
if !ok {
return next.Server(ctx).Close(ctx, conn)
}

// Get the fileMap
fileMap, _ := r.fileMaps.LoadOrStore(conn.GetId(), &perConnectionFileMap{
filesByInodeURL: make(map[string]*os.File),
inodeURLbyFilename: make(map[string]*url.URL),
})
// Clean up the fileMap no matter what happens
defer r.fileMaps.Delete(conn.GetId())

// Recv the FD and Swap the Inode for a file in InodeURL in Parameters
err := recvFDAndSwapInodeToFile(ctx, fileMap, conn.GetMechanism().GetParameters(), recv)
Expand All @@ -104,6 +106,20 @@ func (r *recvFDServer) Close(ctx context.Context, conn *networkservice.Connectio
}

// Swap back from File to Inode in the InodeURL in the Parameters
err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters(), true)
err = swapFileToInode(fileMap, conn.GetMechanism().GetParameters())
return &empty.Empty{}, err
}

func (r *recvFDServer) closeFiles(conn *networkservice.Connection) {
denis-tingaikin marked this conversation as resolved.
Show resolved Hide resolved
defer r.fileMaps.Delete(conn.GetId())

fileMap, _ := r.fileMaps.LoadOrStore(conn.GetId(), &perConnectionFileMap{
filesByInodeURL: make(map[string]*os.File),
inodeURLbyFilename: make(map[string]*url.URL),
})

for inodeURLStr, file := range fileMap.filesByInodeURL {
delete(fileMap.filesByInodeURL, inodeURLStr)
_ = file.Close()
}
}
283 changes: 283 additions & 0 deletions pkg/networkservice/common/mechanisms/recvfd/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
// Copyright (c) 2021 Doc.ai and/or its affiliates.

//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line was added by go fmt as I'm using 1.7. I've removed this line.

// +build linux

package recvfd_test

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/url"
"os"
"path"
"runtime"
"syscall"
"testing"
"time"

"github.com/edwarnicke/grpcfd"
"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/peer"

"github.com/networkservicemesh/sdk/pkg/networkservice/chains/client"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/recvfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/mechanisms/sendfd"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

type checkFileClosed struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type checkFileClosed struct {
type checkRecvfdServer struct {

onFileClosed map[string]func()

t *testing.T
}

type wrapTransceiver struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type wrapTransceiver struct {
type notifiableFDTransceiver struct {

grpcfd.FDTransceiver
net.Addr

onFileClosed map[string]func()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
onFileClosed map[string]func()
onRecvFile(*os.File)

}

func (n *checkFileClosed) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

func (n *checkFileClosed) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
p, ok := peer.FromContext(ctx)
if !ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !ok {
require.True(t, ok)

n.t.Fatal("No peer in context")
}

transceiver, ok := p.Addr.(grpcfd.FDTransceiver)
assert.True(n.t, ok)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.True(n.t, ok)
require.True(n.t, ok)


p.Addr = &wrapTransceiver{
FDTransceiver: transceiver,
Addr: p.Addr,
onFileClosed: n.onFileClosed,
}

return next.Server(ctx).Request(ctx, request)
}

func (w *wrapTransceiver) RecvFileByURL(urlStr string) (<-chan *os.File, error) {
res, err := w.FDTransceiver.RecvFileByURL(urlStr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
res, err := w.FDTransceiver.RecvFileByURL(urlStr)
recv, err := w.FDTransceiver.RecvFileByURL(urlStr)

if err != nil {
return nil, err
}

var chanRes = make(chan *os.File)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var chanRes = make(chan *os.File)
var fileCh = make(chan *os.File)

go func() {
for f := range res {
runtime.SetFinalizer(f, func(file *os.File) {
onFileClosedFunc, ok := w.onFileClosed[urlStr]
if ok {
onFileClosedFunc()
}
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
runtime.SetFinalizer(f, func(file *os.File) {
onFileClosedFunc, ok := w.onFileClosed[urlStr]
if ok {
onFileClosedFunc()
}
})
w.onRecvFile(f)

chanRes <- f
}
}()

return chanRes, nil
}

func createFile(fileName string, t *testing.T) (inodeURLStr string, fileClosedContext context.Context, cancelFunc func()) {
f, err := os.Create(fileName)
require.NoError(t, err, "Failed to create and open a file: %v", err)

info, err := f.Stat()
assert.NoError(t, err)

stat, ok := info.Sys().(*syscall.Stat_t)
assert.True(t, ok)

err = f.Close()
require.NoError(t, err, "Failed to close file: %v", err)

fileClosedContext, cancelFunc = context.WithCancel(context.Background())
inodeURLStr = fmt.Sprintf("inode://%d/%d", stat.Dev, stat.Ino)

return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use anonymous returns. I wonder why linter is not alerting on this line...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply this to other places.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not quite follow why do we need this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required in case of multiple files to check that we're closing the same files that we created - I'm returning "inodeURLStr" to create a mapping inodeURLStr -> onFileClosedFunc which I'm later using in finalizer.


func createServerAndClient(ctx context.Context, t *testing.T, testServerChain *networkservice.NetworkServiceServer, serveURL *url.URL) (testClient networkservice.NetworkServiceClient) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func createServerAndClient(ctx context.Context, t *testing.T, testServerChain *networkservice.NetworkServiceServer, serveURL *url.URL) (testClient networkservice.NetworkServiceClient) {
func createServerAndClient(ctx context.Context, t *testing.T, testServerChain *networkservice.NetworkServiceServer, serveURL *url.URL) networkservice.NetworkServiceClient {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use return without variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks complicated.

Split this function into two.

OR

Consider using endpoint package and client chain directly.

var grpcServer = grpc.NewServer(grpc.Creds(grpcfd.TransportCredentials(insecure.NewCredentials())))
networkservice.RegisterNetworkServiceServer(grpcServer, *testServerChain)

var errCh = grpcutils.ListenAndServe(ctx, serveURL, grpcServer)

select {
case e := <-errCh:
assert.Failf(t, "Server failed to start: %v", e.Error())
default:
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
select {
case e := <-errCh:
assert.Failf(t, "Server failed to start: %v", e.Error())
default:
}
require.Len(t, errCh, 0)


testClient = client.NewClient(
ctx,
client.WithClientURL(sandbox.CloneURL(serveURL)),
client.WithDialOptions(grpc.WithTransportCredentials(
grpcfd.TransportCredentials(insecure.NewCredentials())),
),
client.WithDialTimeout(time.Second),
client.WithoutRefresh(),
client.WithAdditionalFunctionality(sendfd.NewClient()))
return
}

func TestRecvfdClosesSingleFile(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

dir, err := ioutil.TempDir(os.TempDir(), t.Name())
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(dir)
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use t.TempDir()


s, err := os.Create(path.Join(dir, "test.sock"))
require.NoError(t, err)

var testFileName = path.Join(dir, t.Name()+".test")

inodeURLStr, fileClosedContext, cancelFunc := createFile(testFileName, t)

serveURL := &url.URL{Scheme: "unix", Path: s.Name(), Host: "0.0.0.0:5000"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why do wee need a Host for unix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not required, I've removed it


var testChain = chain.NewNetworkServiceServer(
&checkFileClosed{
onFileClosed: map[string]func(){
inodeURLStr: cancelFunc,
},
},
recvfd.NewServer())

testClient := createServerAndClient(ctx, t, &testChain, serveURL)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: []*networkservice.Mechanism{
{
Cls: cls.LOCAL,
Type: kernel.MECHANISM,
Parameters: map[string]string{
common.InodeURL: "file:" + testFileName,
},
},
},
}

conn, err := testClient.Request(ctx, request)
require.NoError(t, err)

_, err = testClient.Close(ctx, conn)
require.NoError(t, err)

require.Eventually(t, func() bool {
runtime.GC()
return fileClosedContext.Err() != nil
}, time.Second, time.Millisecond*100)
}

func TestRecvfdClosesMultipleFiles(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

dir, err := ioutil.TempDir(os.TempDir(), t.Name())
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(dir)
}()

s, err := os.Create(path.Join(dir, "test.sock"))
require.NoError(t, err)

const numFiles = 3
var fileClosedContexts = make([]context.Context, numFiles)
var onFileClosedFuncs = make(map[string]func(), numFiles)

request := &networkservice.NetworkServiceRequest{
MechanismPreferences: make([]*networkservice.Mechanism, numFiles),
}

var filePath string
for i := 0; i < numFiles; i++ {
filePath = path.Join(dir, fmt.Sprintf("%s.test%d", t.Name(), i))

inodeURLStr, fileClosedContext, cancelFunc := createFile(filePath, t)
onFileClosedFuncs[inodeURLStr] = cancelFunc
fileClosedContexts[i] = fileClosedContext

request.MechanismPreferences = append(request.MechanismPreferences,
&networkservice.Mechanism{
Cls: cls.LOCAL,
Type: kernel.MECHANISM,
Parameters: map[string]string{
common.InodeURL: "file:" + filePath,
},
})
}

serveURL := &url.URL{Scheme: "unix", Path: s.Name(), Host: "0.0.0.0:5000"}

var testChain = chain.NewNetworkServiceServer(
&checkFileClosed{
onFileClosed: onFileClosedFuncs,
},
recvfd.NewServer())

var testClient = createServerAndClient(ctx, t, &testChain, serveURL)

conn, err := testClient.Request(ctx, request)
require.NoError(t, err)

_, err = testClient.Close(ctx, conn)
require.NoError(t, err)

require.Eventually(t, func() bool {
runtime.GC()
return fileClosedContexts[0].Err() != nil
}, time.Second, time.Millisecond*100)

require.Eventually(t, func() bool {
runtime.GC()
return fileClosedContexts[1].Err() != nil
}, time.Second, time.Millisecond*100)

require.Eventually(t, func() bool {
runtime.GC()
return fileClosedContexts[2].Err() != nil
}, time.Second, time.Millisecond*100)
}
Loading