diff --git a/rpc/bench_test.go b/rpc/bench_test.go index 6c2a4003..bb64627f 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -12,7 +12,7 @@ import ( func BenchmarkPingPong(b *testing.B) { p1, p2 := transport.NewPipe(1) srv := testcp.PingPong_ServerToClient(pingPongServer{}, nil) - conn1 := rpc.NewConn(p2, &rpc.Options{ + conn1 := rpc.NewConn(rpc.NewTransport(p2), &rpc.Options{ ErrorReporter: testErrorReporter{tb: b}, BootstrapClient: srv.Client, }) @@ -22,7 +22,7 @@ func BenchmarkPingPong(b *testing.B) { b.Error("conn1.Close:", err) } }() - conn2 := rpc.NewConn(p1, &rpc.Options{ + conn2 := rpc.NewConn(rpc.NewTransport(p1), &rpc.Options{ ErrorReporter: testErrorReporter{tb: b}, }) defer func() { diff --git a/rpc/level0_test.go b/rpc/level0_test.go index 5ae21522..6b853c80 100644 --- a/rpc/level0_test.go +++ b/rpc/level0_test.go @@ -52,10 +52,15 @@ func TestMain(m *testing.M) { // sends an Abort message and it reports no errors. Level 0 requirement. func TestSendAbort(t *testing.T) { t.Parallel() + t.Helper() t.Run("ReceiverListening", func(t *testing.T) { - p1, p2 := transport.NewPipe(1) + t.Parallel() + + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) defer p2.Close() + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t, fail: true}, }) @@ -89,9 +94,11 @@ func TestSendAbort(t *testing.T) { } }) t.Run("ReceiverNotListening", func(t *testing.T) { + t.Parallel() + p1, p2 := transport.NewPipe(0) defer p2.Close() - conn := rpc.NewConn(p1, &rpc.Options{ + conn := rpc.NewConn(rpc.NewTransport(p1), &rpc.Options{ ErrorReporter: testErrorReporter{tb: t, fail: true}, }) @@ -109,8 +116,10 @@ func TestSendAbort(t *testing.T) { func TestRecvAbort(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) defer p2.Close() + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -165,7 +174,9 @@ func TestRecvAbort(t *testing.T) { func TestSendBootstrapError(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -255,7 +266,9 @@ func TestSendBootstrapError(t *testing.T) { func TestSendBootstrapCall(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -464,7 +477,9 @@ func TestSendBootstrapCall(t *testing.T) { func TestSendBootstrapCallException(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -639,7 +654,9 @@ func TestSendBootstrapCallException(t *testing.T) { func TestSendBootstrapPipelineCall(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -798,7 +815,9 @@ func TestSendBootstrapPipelineCall(t *testing.T) { func TestRecvBootstrapError(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -872,7 +891,9 @@ func TestRecvBootstrapCall(t *testing.T) { func() { close(srvShutdown) }) - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, ErrorReporter: testErrorReporter{tb: t}, @@ -1028,7 +1049,9 @@ func TestRecvBootstrapCallException(t *testing.T) { srv := newServer(func(ctx context.Context, call *server.Call) error { return errors.New("everything went wrong") }, nil) - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, ErrorReporter: testErrorReporter{tb: t}, @@ -1183,7 +1206,9 @@ func TestRecvBootstrapPipelineCall(t *testing.T) { func() { close(srvShutdown) }) - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, ErrorReporter: testErrorReporter{tb: t}, @@ -1289,7 +1314,9 @@ func TestRecvBootstrapPipelineCall(t *testing.T) { func TestCallOnClosedConn(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + defer p2.Close() conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, @@ -1431,7 +1458,9 @@ func TestRecvCancel(t *testing.T) { } return nil }, nil) - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + defer p2.Close() conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, @@ -1579,7 +1608,9 @@ func TestRecvCancel(t *testing.T) { func TestSendCancel(t *testing.T) { t.Parallel() - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) diff --git a/rpc/level1_test.go b/rpc/level1_test.go index 09c65945..dfd6719f 100644 --- a/rpc/level1_test.go +++ b/rpc/level1_test.go @@ -30,7 +30,9 @@ func TestSendDisembargo(t *testing.T) { // should not be delivered until after the disembargo loops back. // Level 1 requirement. func testSendDisembargo(t *testing.T, sendPrimeTo rpccp.Call_sendResultsTo_Which) { - p1, p2 := transport.NewPipe(1) + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ ErrorReporter: testErrorReporter{tb: t}, }) @@ -504,7 +506,10 @@ func TestRecvDisembargo(t *testing.T) { } return nil }, nil) - p1, p2 := transport.NewPipe(2) + + left, right := transport.NewPipe(2) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, ErrorReporter: testErrorReporter{tb: t}, @@ -805,7 +810,10 @@ func TestIssue3(t *testing.T) { } return nil }, nil) - p1, p2 := transport.NewPipe(1) + + left, right := transport.NewPipe(1) + p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right) + conn := rpc.NewConn(p1, &rpc.Options{ BootstrapClient: srv, ErrorReporter: testErrorReporter{tb: t}, diff --git a/rpc/transport/pipe.go b/rpc/transport/pipe.go index 5b4ac392..5f230d8b 100644 --- a/rpc/transport/pipe.go +++ b/rpc/transport/pipe.go @@ -2,199 +2,76 @@ package transport import ( "context" - "errors" "fmt" - "runtime" - "strconv" - "sync" + "io" + "time" - "capnproto.org/go/capnp/v3" - rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" + capnp "capnproto.org/go/capnp/v3" ) -type pipe struct { - r <-chan pipeMsg - rc chan struct{} // close to hang up reads +// NewPipe returns a pair of codecs which communicate over +// channels, copying messages at the channel boundary. +// bufSz is the size of the channel buffers. +func NewPipe(bufSz int) (c1, c2 Codec) { + ch1 := make(chan *capnp.Message, bufSz) + ch2 := make(chan *capnp.Message, bufSz) - w chan<- pipeMsg - wc <-chan struct{} // closed when writes are no longer listened to - msgs *callerSet -} + c1 = &pipe{ + send: ch1, recv: ch2, + } -type pipeMsg struct { - msg rpccp.Message - release capnp.ReleaseFunc -} + c2 = &pipe{ + send: ch2, recv: ch1, + } -// NewPipe returns a pair of transports which communicate over -// channels, sending and receiving messages without copying. -// bufSz is the size of the channel buffers. -func NewPipe(bufSz int) (p1, p2 Transport) { - ch1 := make(chan pipeMsg, bufSz) - ch2 := make(chan pipeMsg, bufSz) - close1 := make(chan struct{}) - close2 := make(chan struct{}) - return &pipe{r: ch1, w: ch2, rc: close1, wc: close2, msgs: newCallerSet()}, - &pipe{r: ch2, w: ch1, rc: close2, wc: close1, msgs: newCallerSet()} + return } -func (p *pipe) NewMessage(ctx context.Context) (_ rpccp.Message, send func() error, release capnp.ReleaseFunc, _ error) { - msg, seg, _ := capnp.NewMessage(capnp.MultiSegment(nil)) - rmsg, _ := rpccp.NewRootMessage(seg) - clearCaller := p.msgs.Add() +type pipe struct { + send chan<- *capnp.Message + recv <-chan *capnp.Message + timeout <-chan time.Time +} - // Variables aren't synchronized because the Transport interface does - // not require them to be. Should trigger race detector. - sent, sendDone, recvDone := false, false, false - // Since refs is used by Sender and Receiver, then it must be synchronized. - var ( - refsMu sync.Mutex - refs int = 1 - ) - send = func() error { - if sendDone { - panic("send after release") - } - if sent { - panic("double send") - } - sent = true - refsMu.Lock() - refs++ - refsMu.Unlock() - pm := pipeMsg{ - msg: rmsg, - release: func() { - if recvDone { - return - } - recvDone = true - refsMu.Lock() - r := refs - 1 - refs = r - refsMu.Unlock() - if r == 0 { - msg.Reset(nil) - } - }, - } - select { - case p.w <- pm: - return nil - case <-p.wc: - p.w = nil - refsMu.Lock() - r := refs - 1 - refs = r - refsMu.Unlock() - if r == 0 { - msg.Reset(nil) - } - return errors.New("rpc pipe: send on closed pipe") - case <-ctx.Done(): - refsMu.Lock() - r := refs - 1 - refs = r - refsMu.Unlock() - if r == 0 { - msg.Reset(nil) - } - return fmt.Errorf("rpc pipe: %w", ctx.Err()) - } +func (p *pipe) Encode(ctx context.Context, m *capnp.Message) error { + b, err := m.Marshal() + if err != nil { + return err } - release = func() { - if sendDone { - return - } - sendDone = true - clearCaller() - refsMu.Lock() - r := refs - 1 - refs = r - refsMu.Unlock() - if r == 0 { - msg.Reset(nil) - } - } - return rmsg, send, release, nil -} -type newMessageCaller struct { - file string - line int -} + if m, err = capnp.Unmarshal(b); err != nil { + return err + } -func (p *pipe) RecvMessage(ctx context.Context) (rpccp.Message, capnp.ReleaseFunc, error) { select { - case pm, ok := <-p.r: - if !ok { - return rpccp.Message{}, nil, errors.New("rpc pipe: receive on closed pipe") - } - return pm.msg, pm.release, nil - case <-p.rc: - return rpccp.Message{}, nil, errors.New("rpc pipe: receive interrupted by close") + case p.send <- m: + return nil + case <-p.timeout: + return fmt.Errorf("partial write timeout") case <-ctx.Done(): - return rpccp.Message{}, nil, fmt.Errorf("rpc pipe: %w", ctx.Err()) + return ctx.Err() } } -func (p *pipe) Close() error { - p.msgs.Finish() - close(p.w) - close(p.rc) - for { - select { - case _, ok := <-p.r: - if !ok { - return nil - } - default: - return nil +func (p *pipe) Decode(ctx context.Context) (*capnp.Message, error) { + select { + case m, ok := <-p.recv: + if !ok { + return nil, io.ErrClosedPipe } - } -} -type callerSet struct { - mu sync.Mutex - callers map[*newMessageCaller]struct{} -} + return m, nil -func newCallerSet() *callerSet { - return &callerSet{ - callers: map[*newMessageCaller]struct{}{}, + case <-ctx.Done(): + return nil, ctx.Err() } } -func (cs *callerSet) Finish() { - if len(cs.callers) > 0 { - var callers []byte - for c := range cs.callers { - if len(callers) > 0 { - callers = append(callers, ", "...) - } - if c.file == "" && c.line == 0 { - callers = append(callers, ""...) - continue - } - callers = append(callers, c.file...) - callers = append(callers, ':') - callers = strconv.AppendInt(callers, int64(c.line), 10) - } - panic("Close called before releasing all messages. Unreleased: " + string(callers)) - } +func (p *pipe) SetPartialWriteTimeout(d time.Duration) { + p.timeout = time.After(d) } -func (cs *callerSet) Add() capnp.ReleaseFunc { - cs.mu.Lock() - defer cs.mu.Unlock() - - _, file, line, _ := runtime.Caller(2) - caller := &newMessageCaller{file, line} - cs.callers[caller] = struct{}{} - - return func() { - cs.mu.Lock() - delete(cs.callers, caller) - cs.mu.Unlock() - } +func (p *pipe) Close() error { + close(p.send) + return nil } diff --git a/rpc/transport/pipe_test.go b/rpc/transport/pipe_test.go deleted file mode 100644 index d35e736a..00000000 --- a/rpc/transport/pipe_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package transport - -import ( - "testing" -) - -func TestPipeTransport(t *testing.T) { - testTransport(t, func() (t1, t2 Transport, err error) { - p1, p2 := NewPipe(1) - return p1, p2, nil - }) -}