diff --git a/rpc/rpc.go b/rpc/rpc.go index ae4ad2d2..51817223 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -8,13 +8,14 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/exp/mpsc" "capnproto.org/go/capnp/v3/internal/syncutil" "capnproto.org/go/capnp/v3/rpc/transport" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" - "golang.org/x/sync/errgroup" ) /* @@ -124,9 +125,12 @@ type ErrorReporter interface { ReportError(error) } -// NewConn creates a new connection that communications on a given -// transport. Closing the connection will close the transport. -// Passing nil for opts is the same as passing the zero value. +// NewConn creates a new connection that communicates on a given transport. +// +// Closing the connection will close the transport and release the bootstrap +// client provided in opts. +// +// If opts == nil, sensible defaults are used. See Options for more info. // // Once a connection is created, it will immediately start receiving // requests from the transport. diff --git a/rpc/serve.go b/rpc/serve.go new file mode 100644 index 00000000..789099dd --- /dev/null +++ b/rpc/serve.go @@ -0,0 +1,60 @@ +package rpc + +import ( + "context" + "errors" + "net" + + "capnproto.org/go/capnp/v3" +) + +// Serve serves a Cap'n Proto RPC to incoming connections. +// +// Serve will take ownership of bootstrapClient and release it after the listener closes. +// +// Serve exits with the listener error if the listener is closed by the owner. +func Serve(lis net.Listener, boot capnp.Client) error { + if !boot.IsValid() { + err := errors.New("bootstrap client is not valid") + return err + } + // Since we took ownership of the bootstrap client, release it after we're done. + defer boot.Release() + for { + // Accept incoming connections + conn, err := lis.Accept() + if err != nil { + return err + } + + // the RPC connection takes ownership of the bootstrap interface and will release it when the connection + // exits, so use AddRef to avoid releasing the provided bootstrap client capability. + opts := Options{ + BootstrapClient: boot.AddRef(), + } + // For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests + transport := NewStreamTransport(conn) + _ = NewConn(transport, &opts) + } +} + +// ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections +// +// network and address are passed to net.Listen. Use network "unix" for Unix Domain Sockets +// and "tcp" for regular TCP IP4 or IP6 connections. +// +// ListenAndServe will take ownership of bootstrapClient and release it on exit. +func ListenAndServe(ctx context.Context, network, addr string, bootstrapClient capnp.Client) error { + + listener, err := net.Listen(network, addr) + + if err == nil { + // to close this listener, close the context + go func() { + <-ctx.Done() + _ = listener.Close() + }() + err = Serve(listener, bootstrapClient) + } + return err +} diff --git a/rpc/serve_test.go b/rpc/serve_test.go new file mode 100644 index 00000000..a7182539 --- /dev/null +++ b/rpc/serve_test.go @@ -0,0 +1,143 @@ +package rpc_test + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/rpc" + testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" +) + +// Test connect/disconnect to a pingpong capability +func TestServe(t *testing.T) { + t.Parallel() + errChannel := make(chan error) + + t.Log("Opening listener") + lis, err := net.Listen("tcp", ":0") + assert.NoError(t, err) + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + bootstrapClient := capnp.Client(srv) + + go func() { + err2 := rpc.Serve(lis, bootstrapClient) + t.Log("Serve has ended") + errChannel <- err2 + }() + + // Create the pingpong client using the server address and close it + addr := lis.Addr().String() + conn, err := net.Dial("tcp", addr) + assert.NoError(t, err) + transport := rpc.NewStreamTransport(conn) + rpcConn := rpc.NewConn(transport, nil) + err = rpcConn.Close() + assert.NoError(t, err) + + // repeat to ensure that a second connection is allowed and doesn't mess + // with releasing the bootstrap reference counting. + conn, err = net.Dial("tcp", addr) + assert.NoError(t, err) + transport = rpc.NewStreamTransport(conn) + rpcConn = rpc.NewConn(transport, nil) + err = rpcConn.Close() + assert.NoError(t, err) + + t.Log("Closing server listener") + err = lis.Close() + assert.NoError(t, err) + select { + case <-time.After(time.Second * 2): + t.Error("Serve did not return after listener was closed") + case err = <-errChannel: + // Expect that the server finished with the 'connection closed' error. + assert.ErrorIs(t, err, net.ErrClosed) + // Check that the bootstrap client was released by Serve. + assert.False(t, bootstrapClient.IsValid(), "Serve did not release its bootstrap client") + } +} + +// TestServeCapability serves the ping pong capability and tests +// if a client can successfuly receive served data. +func TestServeCapability(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // Start the pingpong server + t.Log("Opening listener") + lis, err := net.Listen("tcp", ":0") + assert.NoError(t, err) + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + bootstrapClient := capnp.Client(srv) + + errChannel := make(chan error) + go func() { + err2 := rpc.Serve(lis, bootstrapClient) + t.Log("Serve has ended") + errChannel <- err2 + }() + + // Create the pingpong client using the server address + addr := lis.Addr().String() + conn, err := net.Dial("tcp", addr) + assert.NoError(t, err) + transport := rpc.NewStreamTransport(conn) + rpcConn := rpc.NewConn(transport, nil) + defer rpcConn.Close() + ppClient := testcp.PingPong(rpcConn.Bootstrap(ctx)) + defer ppClient.Release() + + // Invoke the magic N method. If Serve works this should provide the magic number. + method, releaseMethod := ppClient.EchoNum(ctx, func(ps testcp.PingPong_echoNum_Params) error { + ps.SetN(42) + return nil + }) + defer releaseMethod() + resp, err := method.Struct() + assert.NoError(t, err) + numberN := resp.N() + assert.Equal(t, int64(42), numberN) + t.Logf("Received pingpong: N=%d", numberN) + + // shutdown the server and verify that Serve exits + err = lis.Close() + assert.NoError(t, err) + + select { + case <-time.After(time.Second * 2): + t.Error("Serve did not return after listener was closed") + case err = <-errChannel: + assert.ErrorIs(t, err, net.ErrClosed) + } + + assert.False(t, bootstrapClient.IsValid(), "server bootstrap client not released") +} + +func TestListenAndServe(t *testing.T) { + var err error + t.Parallel() + ctx, cancelFunc := context.WithCancel(context.Background()) + errChannel := make(chan error) + + // Provide a server that listens + srv := testcp.PingPong_ServerToClient(pingPongServer{}) + bootstrapClient := capnp.Client(srv) + go func() { + t.Log("Starting ListenAndServe") + err2 := rpc.ListenAndServe(ctx, "tcp", ":0", bootstrapClient) + errChannel <- err2 + }() + + cancelFunc() + select { + case <-time.After(time.Second * 2): + t.Error("Cancelling context didn't end the listener") + case err = <-errChannel: + assert.ErrorIs(t, err, net.ErrClosed) + } +}