Skip to content

Commit

Permalink
Bugfix. Re-implement transport.NewPipe to copy messages.
Browse files Browse the repository at this point in the history
NewPipe now returns a Codec.
  • Loading branch information
lthibault committed Jul 15, 2022
1 parent 3d8f333 commit ce8a15c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 200 deletions.
4 changes: 2 additions & 2 deletions rpc/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func BenchmarkPingPong(b *testing.B) {
p1, p2 := transport.NewPipe(1)
srv := testcp.PingPong_ServerToClient(pingPongServer{}, nil)
conn1 := rpc.NewConn(p2, &rpc.Options{
conn1 := rpc.NewConn(rpc.NewTransport(p2), &rpc.Options{
ErrorReporter: testErrorReporter{tb: b},
BootstrapClient: srv.Client,
})
Expand All @@ -22,7 +22,7 @@ func BenchmarkPingPong(b *testing.B) {
b.Error("conn1.Close:", err)
}
}()
conn2 := rpc.NewConn(p1, &rpc.Options{
conn2 := rpc.NewConn(rpc.NewTransport(p1), &rpc.Options{
ErrorReporter: testErrorReporter{tb: b},
})
defer func() {
Expand Down
59 changes: 45 additions & 14 deletions rpc/level0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ func TestMain(m *testing.M) {
// sends an Abort message and it reports no errors. Level 0 requirement.
func TestSendAbort(t *testing.T) {
t.Parallel()
t.Helper()

t.Run("ReceiverListening", func(t *testing.T) {
p1, p2 := transport.NewPipe(1)
t.Parallel()

left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)
defer p2.Close()

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t, fail: true},
})
Expand Down Expand Up @@ -89,9 +94,11 @@ func TestSendAbort(t *testing.T) {
}
})
t.Run("ReceiverNotListening", func(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(0)
defer p2.Close()
conn := rpc.NewConn(p1, &rpc.Options{
conn := rpc.NewConn(rpc.NewTransport(p1), &rpc.Options{
ErrorReporter: testErrorReporter{tb: t, fail: true},
})

Expand All @@ -109,8 +116,10 @@ func TestSendAbort(t *testing.T) {
func TestRecvAbort(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)
defer p2.Close()

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -165,7 +174,9 @@ func TestRecvAbort(t *testing.T) {
func TestSendBootstrapError(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -255,7 +266,9 @@ func TestSendBootstrapError(t *testing.T) {
func TestSendBootstrapCall(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -464,7 +477,9 @@ func TestSendBootstrapCall(t *testing.T) {
func TestSendBootstrapCallException(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -639,7 +654,9 @@ func TestSendBootstrapCallException(t *testing.T) {
func TestSendBootstrapPipelineCall(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -798,7 +815,9 @@ func TestSendBootstrapPipelineCall(t *testing.T) {
func TestRecvBootstrapError(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -872,7 +891,9 @@ func TestRecvBootstrapCall(t *testing.T) {
func() {
close(srvShutdown)
})
p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Expand Down Expand Up @@ -1028,7 +1049,9 @@ func TestRecvBootstrapCallException(t *testing.T) {
srv := newServer(func(ctx context.Context, call *server.Call) error {
return errors.New("everything went wrong")
}, nil)
p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Expand Down Expand Up @@ -1183,7 +1206,9 @@ func TestRecvBootstrapPipelineCall(t *testing.T) {
func() {
close(srvShutdown)
})
p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Expand Down Expand Up @@ -1289,7 +1314,9 @@ func TestRecvBootstrapPipelineCall(t *testing.T) {
func TestCallOnClosedConn(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

defer p2.Close()
conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
Expand Down Expand Up @@ -1431,7 +1458,9 @@ func TestRecvCancel(t *testing.T) {
}
return nil
}, nil)
p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

defer p2.Close()
conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
Expand Down Expand Up @@ -1579,7 +1608,9 @@ func TestRecvCancel(t *testing.T) {
func TestSendCancel(t *testing.T) {
t.Parallel()

p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down
14 changes: 11 additions & 3 deletions rpc/level1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func TestSendDisembargo(t *testing.T) {
// should not be delivered until after the disembargo loops back.
// Level 1 requirement.
func testSendDisembargo(t *testing.T, sendPrimeTo rpccp.Call_sendResultsTo_Which) {
p1, p2 := transport.NewPipe(1)
left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
ErrorReporter: testErrorReporter{tb: t},
})
Expand Down Expand Up @@ -504,7 +506,10 @@ func TestRecvDisembargo(t *testing.T) {
}
return nil
}, nil)
p1, p2 := transport.NewPipe(2)

left, right := transport.NewPipe(2)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Expand Down Expand Up @@ -805,7 +810,10 @@ func TestIssue3(t *testing.T) {
}
return nil
}, nil)
p1, p2 := transport.NewPipe(1)

left, right := transport.NewPipe(1)
p1, p2 := rpc.NewTransport(left), rpc.NewTransport(right)

conn := rpc.NewConn(p1, &rpc.Options{
BootstrapClient: srv,
ErrorReporter: testErrorReporter{tb: t},
Expand Down
Loading

0 comments on commit ce8a15c

Please sign in to comment.