Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helpers Serve and ListenAndServe #355

Merged
merged 10 commits into from
Dec 1, 2022
65 changes: 65 additions & 0 deletions rpc/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rpc

import (
"context"
"fmt"
"net"
"strings"
)

// Serve serves a Cap'n Proto RPC to incoming connections
// Serve exits with the listener error
func Serve(lis net.Listener, options *Options) error {
if options == nil { //|| !options.BootstrapClient.IsValid() {
return fmt.Errorf("invalid options")
}
// Accept incoming connections
for {
rwc, err := lis.Accept()
if err != nil {
return err
}
// For each new incoming connection, create a new RPC transport connection that will serve incoming RPC requests
// rpc.Options will contain the bootstrap capability
go func() {
transport := NewStreamTransport(rwc)
conn := NewConn(transport, options)

select {
case <-conn.Done():
// Remote client connection closed
return
}
}()
}
}

// ListenAndServe opens a listener on the given address and serves a Cap'n Proto RPC to incoming connections
// If address starts with "unix:" it is considered a Unix Domain Socket path, otherwise a TCP address.
// Context can be used to stop listening.
func ListenAndServe(ctx context.Context, address string, options *Options) error {
var listener net.Listener
var err error

if address == "" {
return fmt.Errorf("missing address")
}
// UDS paths start with either '.' or '/'
if strings.HasPrefix(address, "unix:") {
listener, err = net.Listen("unix", address[5:])
} else {
listener, err = net.Listen("tcp", address)
}
if err == nil {
// to close this listener, close the context
go func() {
select {
case <-ctx.Done():
_ = listener.Close()
return
}
}()
err = Serve(listener, options)
}
return err
}
47 changes: 47 additions & 0 deletions rpc/serve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rpc_test

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

"capnproto.org/go/capnp/v3/rpc"
)

//clientHook := {}

func TestServe(t *testing.T) {
lis, err := net.Listen("tcp", ":8888")
assert.NoError(t, err)

errChannel := make(chan error)
go func() {
err2 := rpc.Serve(lis, &rpc.Options{})
errChannel <- err2
}()
time.Sleep(time.Second)
t.Log("Closing listener")
err = lis.Close()
assert.NoError(t, err)
err = <-errChannel
assert.ErrorIs(t, err, net.ErrClosed)
}

func TestListenAndServe(t *testing.T) {
hspaay marked this conversation as resolved.
Show resolved Hide resolved
var err error
ctx, cancelFunc := context.WithCancel(context.Background())
errChannel := make(chan error)
go func() {
err2 := rpc.ListenAndServe(ctx, ":8888", &rpc.Options{})
errChannel <- err2
t.Log("Serve has ended")
}()
time.Sleep(time.Second)
t.Log("Closing listener")
cancelFunc()
err = <-errChannel
assert.ErrorIs(t, err, net.ErrClosed)
}