Skip to content

Commit

Permalink
codec: rpc: clean up API usage to support buffered use (for performance)
Browse files Browse the repository at this point in the history
Previously, we internally created buffered reader and writer off the passed io.ReadWriteCloser
got from the socket connection. This is limiting, as it might read more bytes than is necessary
for the stream into an internal buffer that the user is not aware of.

Instead, we now just use the ReadWriteCloser as is, and ask the user to explicitly pass a
buffered ReadWriteCloser if desired.

To assist, we provide a ReadWriteCloser implementation that can be buffered
(see NewReadWriteCloser function).

Typical use-case can now be:

   var conn io.ReadWriteCloser // connection got from a socket
   conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
   var h = GoRpc.ServerCodec(conn2, handle)

Updates #113
Fixes #216
  • Loading branch information
ugorji committed Nov 9, 2017
1 parent 6c3b8cc commit 8c44cd4
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 20 deletions.
14 changes: 11 additions & 3 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
Expand Down Expand Up @@ -574,6 +575,13 @@ func testDeepEqualErr(v1, v2 interface{}, t *testing.T, name string) {
}
}

func testReadWriteCloser(c io.ReadWriteCloser) io.ReadWriteCloser {
if testRpcBufsize <= 0 && rand.Int63()%2 == 0 {
return c
}
return NewReadWriteCloser(c, c, testRpcBufsize, testRpcBufsize)
}

// doTestCodecTableOne allows us test for different variations based on arguments passed.
func doTestCodecTableOne(t *testing.T, testNil bool, h Handle,
vs []interface{}, vsVerify []interface{}) {
Expand Down Expand Up @@ -1005,7 +1013,7 @@ func testCodecRpcOne(t *testing.T, rr Rpc, h Handle, doRequest bool, exitSleepMs
return // exit serverFn goroutine
}
if err1 == nil {
var sc rpc.ServerCodec = rr.ServerCodec(conn1, h)
sc := rr.ServerCodec(testReadWriteCloser(conn1), h)
srv.ServeCodec(sc)
}
}
Expand Down Expand Up @@ -1056,7 +1064,7 @@ func testCodecRpcOne(t *testing.T, rr Rpc, h Handle, doRequest bool, exitSleepMs
}
if doRequest {
bs := connFn()
cc := rr.ClientCodec(bs, h)
cc := rr.ClientCodec(testReadWriteCloser(bs), h)
clientFn(cc)
}
if exitSleepMs != 0 {
Expand Down Expand Up @@ -1466,7 +1474,7 @@ func doTestMsgpackRpcSpecGoClientToPythonSvc(t *testing.T) {
bs, err2 = net.Dial("tcp", ":"+openPort)
}
checkErrT(t, err2)
cc := MsgpackSpecRpc.ClientCodec(bs, testMsgpackH)
cc := MsgpackSpecRpc.ClientCodec(testReadWriteCloser(bs), testMsgpackH)
cl := rpc.NewClientWithCodec(cc)
defer cl.Close()
var rstr string
Expand Down
8 changes: 2 additions & 6 deletions codec/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ type ioEncStringWriter interface {
WriteString(s string) (n int, err error)
}

type ioEncFlusher interface {
Flush() error
}

type encDriverAsis interface {
EncodeAsis(v []byte)
}
Expand Down Expand Up @@ -209,7 +205,7 @@ type ioEncWriter struct {
ww io.Writer
bw io.ByteWriter
sw ioEncStringWriter
fw ioEncFlusher
fw ioFlusher
b [8]byte
}

Expand Down Expand Up @@ -1028,7 +1024,7 @@ func (e *Encoder) Reset(w io.Writer) {
if e.wi.sw, ok = w.(ioEncStringWriter); !ok {
e.wi.sw = &e.wi
}
e.wi.fw, _ = w.(ioEncFlusher)
e.wi.fw, _ = w.(ioFlusher)
e.wi.ww = w
}
e.w = &e.wi
Expand Down
91 changes: 91 additions & 0 deletions codec/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ package codec
// check for these error conditions.

import (
"bufio"
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -1580,6 +1582,95 @@ func isNaN(f float64) bool { return f != f }

// -----------------------

type ioFlusher interface {
Flush() error
}

// ReadWriteCloser wraps a Reader and Writer to return
// a possibly buffered io.ReadWriteCloser implementation.
type ReadWriteCloser struct {
r io.Reader
w io.Writer

br *bufio.Reader
bw *bufio.Writer
rc io.Closer
wc io.Closer

f ioFlusher
}

func (x *ReadWriteCloser) Reader() io.Reader {
if x.br != nil {
return x.br
}
return x.r
}

func (x *ReadWriteCloser) Writer() io.Writer {
if x.bw != nil {
return x.bw
}
return x.w
}

func (x *ReadWriteCloser) Read(p []byte) (n int, err error) {
if x.br != nil {
return x.br.Read(p)
}
return x.r.Read(p)
}

func (x *ReadWriteCloser) Write(p []byte) (n int, err error) {
if x.bw != nil {
return x.bw.Write(p)
}
return x.w.Write(p)
}

func (x *ReadWriteCloser) Close() (err error) {
err = x.Flush()
if x.rc != nil {
err = x.rc.Close()
}
if x.wc != nil {
err = x.wc.Close()
}
return err
}

func (x *ReadWriteCloser) Flush() (err error) {
if x.bw != nil {
err = x.bw.Flush()
}
if x.f != nil {
err = x.f.Flush()
}
return err
}

// ---------

// ReadWriteCloser returns a (possibly buffered) value wrapping
// the Reader and Writer and with an appropriate method for the close.
//
// Use it in contexts (e.g. rpc) where you need to get a buffered wrapper
func NewReadWriteCloser(r io.Reader, w io.Writer, rbufsize, wbufsize int) (x *ReadWriteCloser) {
x = &ReadWriteCloser{r: r, w: w}
if rbufsize > 0 {
x.br = bufio.NewReaderSize(x.r, rbufsize)
}
if wbufsize > 0 {
x.bw = bufio.NewWriterSize(x.w, wbufsize)
}
x.rc, _ = r.(io.Closer)
x.wc, _ = w.(io.Closer)
x.f, _ = w.(ioFlusher)
return x
}

// -----------------------

type intSlice []int64
type uintSlice []uint64
type uintptrSlice []uintptr
Expand Down
31 changes: 20 additions & 11 deletions codec/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type rpcCodec struct {
mu sync.Mutex
h Handle

cls bool
clsmu sync.RWMutex
cls bool
clsmu sync.RWMutex
clsErr error
}

func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
Expand Down Expand Up @@ -124,13 +125,21 @@ func (c *rpcCodec) Close() error {
return nil
}
if c.isClosed() {
return io.EOF
return c.clsErr
}
c.clsmu.Lock()
c.cls = true
err := c.c.Close()
var fErr error
if c.f != nil {
fErr = c.f.Flush()
}
_ = fErr
c.clsErr = c.c.Close()
if c.clsErr == nil && fErr != nil {
c.clsErr = fErr
}
c.clsmu.Unlock()
return err
return c.clsErr
}

func (c *rpcCodec) ReadResponseBody(body interface{}) error {
Expand Down Expand Up @@ -176,6 +185,12 @@ type goRpc struct{}

// GoRpc implements Rpc using the communication protocol defined in net/rpc package.
// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
//
// By default, the conn parameter got from a network is not buffered.
// For performance, considering using a buffered value e.g.
// var conn io.ReadWriteCloser // connection got from a socket
// conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
// var h = GoRpc.ServerCodec(conn2, handle)
var GoRpc goRpc

func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
Expand All @@ -186,10 +201,4 @@ func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
return &goRpcCodec{newRPCCodec(conn, h)}
}

// Use this method to allow you create wrapped versions of the reader, writer if desired.
// For example, to create a buffered implementation.
func (x goRpc) Codec(r io.Reader, w io.Writer, c io.Closer, h Handle) *goRpcCodec {
return &goRpcCodec{newRPCCodec2(r, w, c, h)}
}

// var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered
2 changes: 2 additions & 0 deletions codec/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ var (
testMaxInitLen int

testNumRepeatString int

testRpcBufsize int
)

// variables that are not flags, but which can configure the handles
Expand Down
22 changes: 22 additions & 0 deletions codec/z_all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ func testMsgpackGroup(t *testing.T) {
t.Run("TestMsgpackScalars", TestMsgpackScalars)
}

func testRpcGroup(t *testing.T) {
t.Run("TestBincRpcGo", TestBincRpcGo)
t.Run("TestSimpleRpcGo", TestSimpleRpcGo)
t.Run("TestMsgpackRpcGo", TestMsgpackRpcGo)
t.Run("TestCborRpcGo", TestCborRpcGo)
t.Run("TestJsonRpcGo", TestJsonRpcGo)
t.Run("TestMsgpackRpcSpec", TestMsgpackRpcSpec)
}

func TestCodecSuite(t *testing.T) {
testSuite(t, testCodecGroup)

Expand Down Expand Up @@ -386,6 +395,19 @@ func TestCodecSuite(t *testing.T) {

testMsgpackH.NoFixedNum = oldNoFixedNum

oldRpcBufsize := testRpcBufsize
testRpcBufsize = 0
t.Run("rpc-buf-0", testRpcGroup)
testRpcBufsize = 0
t.Run("rpc-buf-00", testRpcGroup)
testRpcBufsize = 0
t.Run("rpc-buf-000", testRpcGroup)
testRpcBufsize = 16
t.Run("rpc-buf-16", testRpcGroup)
testRpcBufsize = 2048
t.Run("rpc-buf-2048", testRpcGroup)
testRpcBufsize = oldRpcBufsize

testGroupResetFlags()
}

Expand Down

0 comments on commit 8c44cd4

Please sign in to comment.