From 15184c67cf17433a07dc7c7b1acc3f127486b0ae Mon Sep 17 00:00:00 2001 From: Henk Date: Tue, 29 Nov 2022 16:48:28 -0800 Subject: [PATCH 01/10] Add helpers Serve and ListenAndServe --- rpc/Serve.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++ rpc/Serve_test.go | 47 ++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 rpc/Serve.go create mode 100644 rpc/Serve_test.go diff --git a/rpc/Serve.go b/rpc/Serve.go new file mode 100644 index 00000000..70984094 --- /dev/null +++ b/rpc/Serve.go @@ -0,0 +1,65 @@ +package rpc + +import ( + "context" + "fmt" + "net" + "strings" +) + +// Serve serves a Cap'n Proto RPC to incoming connections +// Serve exits with the listener error +func Serve(lis net.Listener, options *Options) error { + if options == nil { //|| !options.BootstrapClient.IsValid() { + return fmt.Errorf("invalid options") + } + // Accept incoming connections + for { + rwc, err := lis.Accept() + if err != nil { + return err + } + // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests + // rpc.Options will contain the bootstrap capability + go func() { + transport := NewStreamTransport(rwc) + conn := NewConn(transport, options) + + select { + case <-conn.Done(): + // Remote client connection closed + return + } + }() + } +} + +// ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections +// If address starts with "unix:" it is considered a Unix Domain Socket path, otherwise a TCP address. +// Context can be used to stop listening. +func ListenAndServe(ctx context.Context, address string, options *Options) error { + var listener net.Listener + var err error + + if address == "" { + return fmt.Errorf("missing address") + } + // UDS paths start with either '.' or '/' + if strings.HasPrefix(address, "unix:") { + listener, err = net.Listen("unix", address[5:]) + } else { + listener, err = net.Listen("tcp", address) + } + if err == nil { + // to close this listener, close the context + go func() { + select { + case <-ctx.Done(): + _ = listener.Close() + return + } + }() + err = Serve(listener, options) + } + return err +} diff --git a/rpc/Serve_test.go b/rpc/Serve_test.go new file mode 100644 index 00000000..0cb6a018 --- /dev/null +++ b/rpc/Serve_test.go @@ -0,0 +1,47 @@ +package rpc_test + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "capnproto.org/go/capnp/v3/rpc" +) + +//clientHook := {} + +func TestServe(t *testing.T) { + lis, err := net.Listen("tcp", ":8888") + assert.NoError(t, err) + + errChannel := make(chan error) + go func() { + err2 := rpc.Serve(lis, &rpc.Options{}) + errChannel <- err2 + }() + time.Sleep(time.Second) + t.Log("Closing listener") + err = lis.Close() + assert.NoError(t, err) + err = <-errChannel + assert.ErrorIs(t, err, net.ErrClosed) +} + +func TestListenAndServe(t *testing.T) { + var err error + ctx, cancelFunc := context.WithCancel(context.Background()) + errChannel := make(chan error) + go func() { + err2 := rpc.ListenAndServe(ctx, ":8888", &rpc.Options{}) + errChannel <- err2 + t.Log("Serve has ended") + }() + time.Sleep(time.Second) + t.Log("Closing listener") + cancelFunc() + err = <-errChannel + assert.ErrorIs(t, err, net.ErrClosed) +} From 1f2cf6999eeee1baf68962fc318d92483dbe203b Mon Sep 17 00:00:00 2001 From: Henk Date: Tue, 29 Nov 2022 16:53:23 -0800 Subject: [PATCH 02/10] Change files to lower case --- rpc/{Serve.go => serve.go} | 0 rpc/{Serve_test.go => serve_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename rpc/{Serve.go => serve.go} (100%) rename rpc/{Serve_test.go => serve_test.go} (100%) diff --git a/rpc/Serve.go b/rpc/serve.go similarity index 100% rename from rpc/Serve.go rename to rpc/serve.go diff --git a/rpc/Serve_test.go b/rpc/serve_test.go similarity index 100% rename from rpc/Serve_test.go rename to rpc/serve_test.go From d577a5c51eb0c7d2cbad3b2a7cce6411d5120006 Mon Sep 17 00:00:00 2001 From: Henk Date: Tue, 29 Nov 2022 18:25:59 -0800 Subject: [PATCH 03/10] Incorporate code review feedback. --- rpc/serve.go | 48 ++++++++++++++++------------------------------- rpc/serve_test.go | 39 +++++++++++++++++++++++++++----------- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/rpc/serve.go b/rpc/serve.go index 70984094..ac4b442b 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -2,17 +2,12 @@ package rpc import ( "context" - "fmt" "net" - "strings" ) // Serve serves a Cap'n Proto RPC to incoming connections // Serve exits with the listener error -func Serve(lis net.Listener, options *Options) error { - if options == nil { //|| !options.BootstrapClient.IsValid() { - return fmt.Errorf("invalid options") - } +func Serve(lis net.Listener, opt *Options) error { // Accept incoming connections for { rwc, err := lis.Accept() @@ -23,43 +18,32 @@ func Serve(lis net.Listener, options *Options) error { // rpc.Options will contain the bootstrap capability go func() { transport := NewStreamTransport(rwc) - conn := NewConn(transport, options) + conn := NewConn(transport, opt) - select { - case <-conn.Done(): - // Remote client connection closed - return - } + <-conn.Done() + // Remote client connection closed + return }() } } // ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections -// If address starts with "unix:" it is considered a Unix Domain Socket path, otherwise a TCP address. -// Context can be used to stop listening. -func ListenAndServe(ctx context.Context, address string, options *Options) error { - var listener net.Listener - var err error +// network and address are passed to net.Listen. Use network "unix" for Unix Domain Sockets +// and "tcp" for regular TCP connections. +func ListenAndServe(ctx context.Context, network, addr string, opt *Options) error { + //var listener net.Listener + //var err error + + //listener, err = net.Listen(network, address) + listener, err := new(net.ListenConfig).Listen(ctx, network, addr) - if address == "" { - return fmt.Errorf("missing address") - } - // UDS paths start with either '.' or '/' - if strings.HasPrefix(address, "unix:") { - listener, err = net.Listen("unix", address[5:]) - } else { - listener, err = net.Listen("tcp", address) - } if err == nil { // to close this listener, close the context go func() { - select { - case <-ctx.Done(): - _ = listener.Close() - return - } + <-ctx.Done() + _ = listener.Close() }() - err = Serve(listener, options) + err = Serve(listener, opt) } return err } diff --git a/rpc/serve_test.go b/rpc/serve_test.go index 0cb6a018..424d9e9f 100644 --- a/rpc/serve_test.go +++ b/rpc/serve_test.go @@ -11,37 +11,54 @@ import ( "capnproto.org/go/capnp/v3/rpc" ) -//clientHook := {} - func TestServe(t *testing.T) { - lis, err := net.Listen("tcp", ":8888") + t.Parallel() + t.Log("Opening listener") + lis, err := net.Listen("tcp", ":0") + defer lis.Close() assert.NoError(t, err) errChannel := make(chan error) go func() { - err2 := rpc.Serve(lis, &rpc.Options{}) + err2 := rpc.Serve(lis, nil) + t.Log("Serve has ended") errChannel <- err2 }() + time.Sleep(time.Second) + t.Log("Closing listener") err = lis.Close() assert.NoError(t, err) - err = <-errChannel - assert.ErrorIs(t, err, net.ErrClosed) + select { + case <-time.After(time.Second * 2): + t.Fail() + case err = <-errChannel: + assert.ErrorIs(t, err, net.ErrClosed) + } } func TestListenAndServe(t *testing.T) { var err error + t.Parallel() ctx, cancelFunc := context.WithCancel(context.Background()) errChannel := make(chan error) + go func() { - err2 := rpc.ListenAndServe(ctx, ":8888", &rpc.Options{}) + t.Log("Starting ListenAndServe") + err2 := rpc.ListenAndServe(ctx, "tcp", ":0", nil) errChannel <- err2 - t.Log("Serve has ended") + t.Log("ListenAndServe has ended") }() + time.Sleep(time.Second) - t.Log("Closing listener") + t.Log("Closing context") + cancelFunc() - err = <-errChannel - assert.ErrorIs(t, err, net.ErrClosed) + select { + case <-time.After(time.Second * 2): + t.Error("Cancelling context didn't end the listener") + case err = <-errChannel: + assert.ErrorIs(t, err, net.ErrClosed) + } } From 932fd02812331bd1f38429628fe98e82c224d9d3 Mon Sep 17 00:00:00 2001 From: Henk Date: Tue, 29 Nov 2022 22:02:55 -0800 Subject: [PATCH 04/10] Incorporate more code review feedback. Added a test that uses Serve to transfer data. --- rpc/serve.go | 8 +++----- rpc/serve_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/rpc/serve.go b/rpc/serve.go index ac4b442b..52c87dbd 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -18,11 +18,12 @@ func Serve(lis net.Listener, opt *Options) error { // rpc.Options will contain the bootstrap capability go func() { transport := NewStreamTransport(rwc) + // Copy the options and clone the bootstrap interface as the connection takes ownership. The original + // bootstrap interface can be dropped after conn := NewConn(transport, opt) <-conn.Done() // Remote client connection closed - return }() } } @@ -31,11 +32,8 @@ func Serve(lis net.Listener, opt *Options) error { // network and address are passed to net.Listen. Use network "unix" for Unix Domain Sockets // and "tcp" for regular TCP connections. func ListenAndServe(ctx context.Context, network, addr string, opt *Options) error { - //var listener net.Listener - //var err error - //listener, err = net.Listen(network, address) - listener, err := new(net.ListenConfig).Listen(ctx, network, addr) + listener, err := net.Listen(network, addr) if err == nil { // to close this listener, close the context diff --git a/rpc/serve_test.go b/rpc/serve_test.go index 424d9e9f..e0caf52b 100644 --- a/rpc/serve_test.go +++ b/rpc/serve_test.go @@ -8,7 +8,9 @@ import ( "github.com/stretchr/testify/assert" + "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/rpc" + testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" ) func TestServe(t *testing.T) { @@ -38,6 +40,49 @@ func TestServe(t *testing.T) { } } +// TestServeCapability serves the ping pong capability and tests +// if a client can successfuly receive served data. +func TestServeCapability(t *testing.T) { + + t.Parallel() + ctx := context.Background() + t.Log("Opening listener") + lis, err := net.Listen("tcp", ":0") + defer lis.Close() + assert.NoError(t, err) + + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + opts := &rpc.Options{ + BootstrapClient: capnp.Client(srv), + } + errChannel := make(chan error) + go func() { + err2 := rpc.Serve(lis, opts) + t.Log("Serve has ended") + errChannel <- err2 + }() + + // connect to the server and invoke the magic N method + addr := lis.Addr().String() + conn, err := net.Dial("tcp", addr) + assert.NoError(t, err) + transport := rpc.NewStreamTransport(conn) + rpcConn := rpc.NewConn(transport, nil) + ppClient := testcp.PingPong(rpcConn.Bootstrap(ctx)) + method, release := ppClient.EchoNum(ctx, func(ps testcp.PingPong_echoNum_Params) error { + ps.SetN(42) + return nil + }) + defer release() + resp, err := method.Struct() + assert.NoError(t, err) + numberN := resp.N() + assert.Equal(t, int64(42), numberN) + t.Logf("Received pingpong: N=%d", numberN) + err = lis.Close() + assert.NoError(t, err) +} + func TestListenAndServe(t *testing.T) { var err error t.Parallel() From 867c7bbb44e8ed1930415deafc014dd35061793f Mon Sep 17 00:00:00 2001 From: Henk Date: Wed, 30 Nov 2022 16:46:03 -0800 Subject: [PATCH 05/10] Code review feedback. Change API to take bootstrap client instead of options Added testcase to verify proper method handling using Serve. Added verification of proper release of capabilities. Update documentation to explain the functions release bootstrap client. --- rpc/rpc.go | 15 +++++++--- rpc/serve.go | 51 +++++++++++++++++++++++-------- rpc/serve_test.go | 76 ++++++++++++++++++++++++++++++++++------------- 3 files changed, 104 insertions(+), 38 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index ae4ad2d2..544d999c 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -8,13 +8,14 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/exp/mpsc" "capnproto.org/go/capnp/v3/internal/syncutil" "capnproto.org/go/capnp/v3/rpc/transport" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" - "golang.org/x/sync/errgroup" ) /* @@ -124,9 +125,15 @@ type ErrorReporter interface { ReportError(error) } -// NewConn creates a new connection that communications on a given -// transport. Closing the connection will close the transport. -// Passing nil for opts is the same as passing the zero value. +// NewConn creates a new connection that communicates on a given transport. +// +// Closing the connection will close the transport and release the bootstrap +// client provided in opts. +// +// Passing nil for opts is the same as passing the zero value. This can be useful +// for obtaining a reference to the remote side's bootstrap. This in turn allows +// this listening connection to be used as the RPC client and make calls to the remote +// side, since capnp connections are symmetric. // // Once a connection is created, it will immediately start receiving // requests from the transport. diff --git a/rpc/serve.go b/rpc/serve.go index 52c87dbd..bee76515 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -2,36 +2,61 @@ package rpc import ( "context" + "errors" "net" + + "capnproto.org/go/capnp/v3" ) -// Serve serves a Cap'n Proto RPC to incoming connections -// Serve exits with the listener error -func Serve(lis net.Listener, opt *Options) error { +// Serve serves a Cap'n Proto RPC to incoming connections. +// +// Serve will take ownership of bootstrapClient and release it after the listener closes. +// +// Serve exits with the listener error if the listener is closed by the owner. +func Serve(lis net.Listener, bootstrapClient capnp.Client) error { + if !bootstrapClient.IsValid() { + err := errors.New("BootstrapClient is not valid") + return err + } // Accept incoming connections for { rwc, err := lis.Accept() if err != nil { + // Since we took ownership of the bootstrap client, release it after we're done. + if !bootstrapClient.IsValid() { + err = errors.New("the bootstrap client was already released") + } + bootstrapClient.Release() return err } + // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests // rpc.Options will contain the bootstrap capability go func() { - transport := NewStreamTransport(rwc) - // Copy the options and clone the bootstrap interface as the connection takes ownership. The original - // bootstrap interface can be dropped after - conn := NewConn(transport, opt) - - <-conn.Done() - // Remote client connection closed + // skip if the bootstrap client has closed since receiving the connection + // this can happen if the server exits while incoming connections are made + if bootstrapClient.IsValid() { + transport := NewStreamTransport(rwc) + // the RPC connection takes ownership of the bootstrap interface and will release it when the connection + // exits, so use AddRef to avoid releasing the provided bootstrap client capability. + opts := Options{ + BootstrapClient: bootstrapClient.AddRef(), + } + conn := NewConn(transport, &opts) + <-conn.Done() + // Remote client connection closed + } }() } } // ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections +// // network and address are passed to net.Listen. Use network "unix" for Unix Domain Sockets -// and "tcp" for regular TCP connections. -func ListenAndServe(ctx context.Context, network, addr string, opt *Options) error { +// and "tcp" for regular TCP IP4 or IP6 connections. +// +// ListenAndServe will take ownership of bootstrapClient and release it on exit. +func ListenAndServe(ctx context.Context, network, addr string, bootstrapClient capnp.Client) error { listener, err := net.Listen(network, addr) @@ -41,7 +66,7 @@ func ListenAndServe(ctx context.Context, network, addr string, opt *Options) err <-ctx.Done() _ = listener.Close() }() - err = Serve(listener, opt) + err = Serve(listener, bootstrapClient) } return err } diff --git a/rpc/serve_test.go b/rpc/serve_test.go index e0caf52b..bb9fbd8d 100644 --- a/rpc/serve_test.go +++ b/rpc/serve_test.go @@ -13,74 +13,109 @@ import ( testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" ) +// Test connect/disconnect to a pingpong capability func TestServe(t *testing.T) { t.Parallel() + errChannel := make(chan error) + t.Log("Opening listener") lis, err := net.Listen("tcp", ":0") - defer lis.Close() assert.NoError(t, err) + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + bootstrapClient := capnp.Client(srv) - errChannel := make(chan error) go func() { - err2 := rpc.Serve(lis, nil) + err2 := rpc.Serve(lis, bootstrapClient) t.Log("Serve has ended") errChannel <- err2 }() - time.Sleep(time.Second) + // Create the pingpong client using the server address and close it + addr := lis.Addr().String() + conn, err := net.Dial("tcp", addr) + assert.NoError(t, err) + transport := rpc.NewStreamTransport(conn) + rpcConn := rpc.NewConn(transport, nil) + err = rpcConn.Close() + assert.NoError(t, err) - t.Log("Closing listener") + // repeat to ensure that a second connection is allowed and doesn't mess + // with releasing the bootstrap reference counting. + conn, err = net.Dial("tcp", addr) + assert.NoError(t, err) + transport = rpc.NewStreamTransport(conn) + rpcConn = rpc.NewConn(transport, nil) + err = rpcConn.Close() + assert.NoError(t, err) + + t.Log("Closing server listener") err = lis.Close() assert.NoError(t, err) select { case <-time.After(time.Second * 2): - t.Fail() + t.Error("Serve did not return after listener was closed") case err = <-errChannel: + // Expect that the server finished with the 'connection closed' error. assert.ErrorIs(t, err, net.ErrClosed) + // Check that the bootstrap client was released by Serve. + assert.False(t, bootstrapClient.IsValid(), "Serve did not release its bootstrap client") } } // TestServeCapability serves the ping pong capability and tests // if a client can successfuly receive served data. func TestServeCapability(t *testing.T) { - t.Parallel() ctx := context.Background() + + // Start the pingpong server t.Log("Opening listener") lis, err := net.Listen("tcp", ":0") - defer lis.Close() assert.NoError(t, err) - srv := testcp.PingPong_ServerToClient(pingPongServer{}) - opts := &rpc.Options{ - BootstrapClient: capnp.Client(srv), - } + bootstrapClient := capnp.Client(srv) + errChannel := make(chan error) go func() { - err2 := rpc.Serve(lis, opts) + err2 := rpc.Serve(lis, bootstrapClient) t.Log("Serve has ended") errChannel <- err2 }() - // connect to the server and invoke the magic N method + // Create the pingpong client using the server address addr := lis.Addr().String() conn, err := net.Dial("tcp", addr) assert.NoError(t, err) transport := rpc.NewStreamTransport(conn) rpcConn := rpc.NewConn(transport, nil) + defer rpcConn.Close() ppClient := testcp.PingPong(rpcConn.Bootstrap(ctx)) - method, release := ppClient.EchoNum(ctx, func(ps testcp.PingPong_echoNum_Params) error { + defer ppClient.Release() + + // Invoke the magic N method. If Serve works this should provide the magic number. + method, releaseMethod := ppClient.EchoNum(ctx, func(ps testcp.PingPong_echoNum_Params) error { ps.SetN(42) return nil }) - defer release() + defer releaseMethod() resp, err := method.Struct() assert.NoError(t, err) numberN := resp.N() assert.Equal(t, int64(42), numberN) t.Logf("Received pingpong: N=%d", numberN) + + // shutdown the server and verify that Serve exits err = lis.Close() assert.NoError(t, err) + + select { + case <-time.After(time.Second * 2): + t.Error("Cancelling context didn't end the server") + case err = <-errChannel: + assert.ErrorIs(t, err, net.ErrClosed) + } + + assert.False(t, bootstrapClient.IsValid(), "server bootstrap client not released") } func TestListenAndServe(t *testing.T) { @@ -89,16 +124,15 @@ func TestListenAndServe(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) errChannel := make(chan error) + // Provide a server that listens + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + bootstrapClient := capnp.Client(srv) go func() { t.Log("Starting ListenAndServe") - err2 := rpc.ListenAndServe(ctx, "tcp", ":0", nil) + err2 := rpc.ListenAndServe(ctx, "tcp", ":0", bootstrapClient) errChannel <- err2 - t.Log("ListenAndServe has ended") }() - time.Sleep(time.Second) - t.Log("Closing context") - cancelFunc() select { case <-time.After(time.Second * 2): From 8edeaa331ffa6ee9285e4ea41270d8fc049c557d Mon Sep 17 00:00:00 2001 From: Henk Date: Wed, 30 Nov 2022 20:00:21 -0800 Subject: [PATCH 06/10] Resolved issue with potential race, by moving Addref outside the go routine, as suggested by @zenhack. --- rpc/serve.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/rpc/serve.go b/rpc/serve.go index bee76515..14ab6c36 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -19,33 +19,30 @@ func Serve(lis net.Listener, bootstrapClient capnp.Client) error { return err } // Accept incoming connections + defer bootstrapClient.Release() for { - rwc, err := lis.Accept() + conn, err := lis.Accept() if err != nil { // Since we took ownership of the bootstrap client, release it after we're done. if !bootstrapClient.IsValid() { err = errors.New("the bootstrap client was already released") } - bootstrapClient.Release() return err } + // the RPC connection takes ownership of the bootstrap interface and will release it when the connection + // exits, so use AddRef to avoid releasing the provided bootstrap client capability. + opts := Options{ + BootstrapClient: bootstrapClient.AddRef(), + } + // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests // rpc.Options will contain the bootstrap capability go func() { - // skip if the bootstrap client has closed since receiving the connection - // this can happen if the server exits while incoming connections are made - if bootstrapClient.IsValid() { - transport := NewStreamTransport(rwc) - // the RPC connection takes ownership of the bootstrap interface and will release it when the connection - // exits, so use AddRef to avoid releasing the provided bootstrap client capability. - opts := Options{ - BootstrapClient: bootstrapClient.AddRef(), - } - conn := NewConn(transport, &opts) - <-conn.Done() - // Remote client connection closed - } + transport := NewStreamTransport(conn) + conn := NewConn(transport, &opts) + <-conn.Done() + // Remote client connection closed }() } } From 4ba387477cbbacd2d2e803940a082410ca12670b Mon Sep 17 00:00:00 2001 From: Henk Date: Wed, 30 Nov 2022 20:00:57 -0800 Subject: [PATCH 07/10] Removed too much out of context information. --- rpc/rpc.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 544d999c..7d4b65fb 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -130,10 +130,7 @@ type ErrorReporter interface { // Closing the connection will close the transport and release the bootstrap // client provided in opts. // -// Passing nil for opts is the same as passing the zero value. This can be useful -// for obtaining a reference to the remote side's bootstrap. This in turn allows -// this listening connection to be used as the RPC client and make calls to the remote -// side, since capnp connections are symmetric. +// Passing nil for opts is the same as passing the zero value. // // Once a connection is created, it will immediately start receiving // requests from the transport. From 08d4718893559ba862d2a01a11e8a99531263e9d Mon Sep 17 00:00:00 2001 From: Henk Date: Thu, 1 Dec 2022 09:03:51 -0800 Subject: [PATCH 08/10] Fix not releasing bootstrap client on exit of Serve. Remove the goroutine for handling the incoming connection as rpc.NewConn already does this. --- rpc/serve.go | 15 ++++----------- rpc/serve_test.go | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/rpc/serve.go b/rpc/serve.go index 14ab6c36..271da37f 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -24,9 +24,7 @@ func Serve(lis net.Listener, bootstrapClient capnp.Client) error { conn, err := lis.Accept() if err != nil { // Since we took ownership of the bootstrap client, release it after we're done. - if !bootstrapClient.IsValid() { - err = errors.New("the bootstrap client was already released") - } + bootstrapClient.Release() return err } @@ -35,15 +33,10 @@ func Serve(lis net.Listener, bootstrapClient capnp.Client) error { opts := Options{ BootstrapClient: bootstrapClient.AddRef(), } - // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests - // rpc.Options will contain the bootstrap capability - go func() { - transport := NewStreamTransport(conn) - conn := NewConn(transport, &opts) - <-conn.Done() - // Remote client connection closed - }() + transport := NewStreamTransport(conn) + rpcConn := NewConn(transport, &opts) + _ = rpcConn } } diff --git a/rpc/serve_test.go b/rpc/serve_test.go index bb9fbd8d..a7182539 100644 --- a/rpc/serve_test.go +++ b/rpc/serve_test.go @@ -110,7 +110,7 @@ func TestServeCapability(t *testing.T) { select { case <-time.After(time.Second * 2): - t.Error("Cancelling context didn't end the server") + t.Error("Serve did not return after listener was closed") case err = <-errChannel: assert.ErrorIs(t, err, net.ErrClosed) } From c3a927e9fbad9d556ed52a8724600cd185c2a45f Mon Sep 17 00:00:00 2001 From: Henk Date: Thu, 1 Dec 2022 10:35:29 -0800 Subject: [PATCH 09/10] Style and documentation updates from review. --- rpc/rpc.go | 2 +- rpc/serve.go | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 7d4b65fb..51817223 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -130,7 +130,7 @@ type ErrorReporter interface { // Closing the connection will close the transport and release the bootstrap // client provided in opts. // -// Passing nil for opts is the same as passing the zero value. +// If opts == nil, sensible defaults are used. See Options for more info. // // Once a connection is created, it will immediately start receiving // requests from the transport. diff --git a/rpc/serve.go b/rpc/serve.go index 271da37f..b610616d 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -13,30 +13,29 @@ import ( // Serve will take ownership of bootstrapClient and release it after the listener closes. // // Serve exits with the listener error if the listener is closed by the owner. -func Serve(lis net.Listener, bootstrapClient capnp.Client) error { - if !bootstrapClient.IsValid() { - err := errors.New("BootstrapClient is not valid") +func Serve(lis net.Listener, boot capnp.Client) error { + if !boot.IsValid() { + err := errors.New("bootstrap client is not valid") return err } // Accept incoming connections - defer bootstrapClient.Release() + defer boot.Release() for { conn, err := lis.Accept() if err != nil { // Since we took ownership of the bootstrap client, release it after we're done. - bootstrapClient.Release() + boot.Release() return err } // the RPC connection takes ownership of the bootstrap interface and will release it when the connection // exits, so use AddRef to avoid releasing the provided bootstrap client capability. opts := Options{ - BootstrapClient: bootstrapClient.AddRef(), + BootstrapClient: boot.AddRef(), } // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests transport := NewStreamTransport(conn) - rpcConn := NewConn(transport, &opts) - _ = rpcConn + _ = NewConn(transport, &opts) } } From bdd502e3c30998cde0599a0efb62b4d5ab9d0410 Mon Sep 17 00:00:00 2001 From: Henk Date: Thu, 1 Dec 2022 13:22:03 -0800 Subject: [PATCH 10/10] Removed the unnecesary release. --- rpc/serve.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rpc/serve.go b/rpc/serve.go index b610616d..789099dd 100644 --- a/rpc/serve.go +++ b/rpc/serve.go @@ -18,13 +18,12 @@ func Serve(lis net.Listener, boot capnp.Client) error { err := errors.New("bootstrap client is not valid") return err } - // Accept incoming connections + // Since we took ownership of the bootstrap client, release it after we're done. defer boot.Release() for { + // Accept incoming connections conn, err := lis.Accept() if err != nil { - // Since we took ownership of the bootstrap client, release it after we're done. - boot.Release() return err }