Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helpers Serve and ListenAndServe #355

Merged
merged 10 commits into from
Dec 1, 2022
47 changes: 47 additions & 0 deletions rpc/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rpc

import (
"context"
"net"
)

// Serve serves a Cap'n Proto RPC to incoming connections
// Serve exits with the listener error
func Serve(lis net.Listener, opt *Options) error {
// Accept incoming connections
for {
rwc, err := lis.Accept()
if err != nil {
return err
}
// For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests
// rpc.Options will contain the bootstrap capability
go func() {
transport := NewStreamTransport(rwc)
// Copy the options and clone the bootstrap interface as the connection takes ownership. The original
// bootstrap interface can be dropped after
conn := NewConn(transport, opt)
hspaay marked this conversation as resolved.
Show resolved Hide resolved

<-conn.Done()
hspaay marked this conversation as resolved.
Show resolved Hide resolved
// Remote client connection closed
}()
}
}

// ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections
// network and address are passed to net.Listen. Use network "unix" for Unix Domain Sockets
// and "tcp" for regular TCP connections.
func ListenAndServe(ctx context.Context, network, addr string, opt *Options) error {

listener, err := net.Listen(network, addr)

if err == nil {
// to close this listener, close the context
go func() {
<-ctx.Done()
_ = listener.Close()
}()
err = Serve(listener, opt)
}
return err
}
109 changes: 109 additions & 0 deletions rpc/serve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package rpc_test

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp"
)

func TestServe(t *testing.T) {
t.Parallel()
t.Log("Opening listener")
lis, err := net.Listen("tcp", ":0")
defer lis.Close()
assert.NoError(t, err)

errChannel := make(chan error)
go func() {
err2 := rpc.Serve(lis, nil)
t.Log("Serve has ended")
errChannel <- err2
}()

time.Sleep(time.Second)

t.Log("Closing listener")
err = lis.Close()
assert.NoError(t, err)
select {
case <-time.After(time.Second * 2):
t.Fail()
hspaay marked this conversation as resolved.
Show resolved Hide resolved
case err = <-errChannel:
assert.ErrorIs(t, err, net.ErrClosed)
}
}

// TestServeCapability serves the ping pong capability and tests
// if a client can successfuly receive served data.
func TestServeCapability(t *testing.T) {

hspaay marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
ctx := context.Background()
t.Log("Opening listener")
lis, err := net.Listen("tcp", ":0")
defer lis.Close()
assert.NoError(t, err)

srv := testcp.PingPong_ServerToClient(pingPongServer{})
opts := &rpc.Options{
BootstrapClient: capnp.Client(srv),
}
errChannel := make(chan error)
go func() {
err2 := rpc.Serve(lis, opts)
t.Log("Serve has ended")
errChannel <- err2
}()

// connect to the server and invoke the magic N method
addr := lis.Addr().String()
conn, err := net.Dial("tcp", addr)
assert.NoError(t, err)
transport := rpc.NewStreamTransport(conn)
rpcConn := rpc.NewConn(transport, nil)
ppClient := testcp.PingPong(rpcConn.Bootstrap(ctx))
method, release := ppClient.EchoNum(ctx, func(ps testcp.PingPong_echoNum_Params) error {
ps.SetN(42)
return nil
})
defer release()
resp, err := method.Struct()
assert.NoError(t, err)
numberN := resp.N()
assert.Equal(t, int64(42), numberN)
t.Logf("Received pingpong: N=%d", numberN)
err = lis.Close()
assert.NoError(t, err)
}

func TestListenAndServe(t *testing.T) {
hspaay marked this conversation as resolved.
Show resolved Hide resolved
var err error
t.Parallel()
ctx, cancelFunc := context.WithCancel(context.Background())
errChannel := make(chan error)

go func() {
t.Log("Starting ListenAndServe")
err2 := rpc.ListenAndServe(ctx, "tcp", ":0", nil)
errChannel <- err2
t.Log("ListenAndServe has ended")
}()

time.Sleep(time.Second)
t.Log("Closing context")

cancelFunc()
select {
case <-time.After(time.Second * 2):
t.Error("Cancelling context didn't end the listener")
case err = <-errChannel:
assert.ErrorIs(t, err, net.ErrClosed)
}
}