Skip to content

Commit

Permalink
codec: rpc: remove NewReadWriteCloser() and document and optimize ins…
Browse files Browse the repository at this point in the history
…tead

Instead of adding a new exposed type and NewReadWriteCloser(...)
convenience function, just document how a user can create a
buffered connection for use in rpc.

Also, there is no downside to doing a buffer during write.
There is only a downside during read.

Consequently, we will use a buffer internally
when passed a non-buffered ReadWriteCloser.

Updates #216
  • Loading branch information
ugorji committed Nov 10, 2017
1 parent 69c49d0 commit f894406
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 139 deletions.
7 changes: 6 additions & 1 deletion codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package codec

import (
"bufio"
"bytes"
"encoding/gob"
"fmt"
Expand Down Expand Up @@ -579,7 +580,11 @@ func testReadWriteCloser(c io.ReadWriteCloser) io.ReadWriteCloser {
if testRpcBufsize <= 0 && rand.Int63()%2 == 0 {
return c
}
return NewReadWriteCloser(c, c, testRpcBufsize, testRpcBufsize)
return struct {
io.Closer
*bufio.Reader
*bufio.Writer
}{c, bufio.NewReaderSize(c, testRpcBufsize), bufio.NewWriterSize(c, testRpcBufsize)}
}

// doTestCodecTableOne allows us test for different variations based on arguments passed.
Expand Down
94 changes: 4 additions & 90 deletions codec/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,11 @@ package codec
// check for these error conditions.

import (
"bufio"
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -604,11 +602,11 @@ type extTypeTagFn struct {

type extHandle []extTypeTagFn

// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead.
//
// AddExt registes an encode and decode function for a reflect.Type.
// AddExt internally calls SetExt.
// To deregister an Ext, call AddExt with nil encfn and/or nil decfn.
//
// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead.
func (o *extHandle) AddExt(
rt reflect.Type, tag byte,
encfn func(reflect.Value) ([]byte, error), decfn func(reflect.Value, []byte) error,
Expand All @@ -619,12 +617,11 @@ func (o *extHandle) AddExt(
return o.SetExt(rt, uint64(tag), addExtWrapper{encfn, decfn})
}

// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead.
//
// Note that the type must be a named type, and specifically not
// a pointer or Interface. An error is returned if that is not honored.
// To Deregister an ext, call SetExt with nil Ext.
//
// To Deregister an ext, call SetExt with nil Ext
// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead.
func (o *extHandle) SetExt(rt reflect.Type, tag uint64, ext Ext) (err error) {
// o is a pointer, because we may need to initialize it
if rt.PkgPath() == "" || rt.Kind() == reflect.Interface {
Expand Down Expand Up @@ -1586,89 +1583,6 @@ type ioFlusher interface {
Flush() error
}

// ReadWriteCloser wraps a Reader and Writer to return
// a possibly buffered io.ReadWriteCloser implementation.
type ReadWriteCloser struct {
r io.Reader
w io.Writer

br *bufio.Reader
bw *bufio.Writer
rc io.Closer
wc io.Closer

f ioFlusher
}

func (x *ReadWriteCloser) Reader() io.Reader {
if x.br != nil {
return x.br
}
return x.r
}

func (x *ReadWriteCloser) Writer() io.Writer {
if x.bw != nil {
return x.bw
}
return x.w
}

func (x *ReadWriteCloser) Read(p []byte) (n int, err error) {
if x.br != nil {
return x.br.Read(p)
}
return x.r.Read(p)
}

func (x *ReadWriteCloser) Write(p []byte) (n int, err error) {
if x.bw != nil {
return x.bw.Write(p)
}
return x.w.Write(p)
}

func (x *ReadWriteCloser) Close() (err error) {
err = x.Flush()
if x.rc != nil {
err = x.rc.Close()
}
if x.wc != nil {
err = x.wc.Close()
}
return err
}

func (x *ReadWriteCloser) Flush() (err error) {
if x.bw != nil {
err = x.bw.Flush()
}
if x.f != nil {
err = x.f.Flush()
}
return err
}

// ---------

// ReadWriteCloser returns a (possibly buffered) value wrapping
// the Reader and Writer and with an appropriate method for the close.
//
// Use it in contexts (e.g. rpc) where you need to get a buffered wrapper
func NewReadWriteCloser(r io.Reader, w io.Writer, rbufsize, wbufsize int) (x *ReadWriteCloser) {
x = &ReadWriteCloser{r: r, w: w}
if rbufsize > 0 {
x.br = bufio.NewReaderSize(x.r, rbufsize)
}
if wbufsize > 0 {
x.bw = bufio.NewWriterSize(x.w, wbufsize)
}
x.rc, _ = r.(io.Closer)
x.wc, _ = w.(io.Closer)
x.f, _ = w.(ioFlusher)
return x
}

// -----------------------

type intSlice []int64
Expand Down
9 changes: 5 additions & 4 deletions codec/msgpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ For compatibility with behaviour of msgpack-c reference implementation:
- Go intX (<0)
IS ENCODED AS
msgpack -ve fixnum, signed
*/

package codec

import (
Expand Down Expand Up @@ -827,7 +827,7 @@ func (c *msgpackSpecRpcCodec) WriteRequest(r *rpc.Request, body interface{}) err
bodyArr = []interface{}{body}
}
r2 := []interface{}{0, uint32(r.Seq), r.ServiceMethod, bodyArr}
return c.write(r2, nil, false, true)
return c.write(r2, nil, false)
}

func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
Expand All @@ -839,7 +839,7 @@ func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) e
body = nil
}
r2 := []interface{}{1, uint32(r.Seq), moe, body}
return c.write(r2, nil, false, true)
return c.write(r2, nil, false)
}

func (c *msgpackSpecRpcCodec) ReadResponseHeader(r *rpc.Response) error {
Expand Down Expand Up @@ -914,7 +914,8 @@ type msgpackSpecRpc struct{}

// MsgpackSpecRpc implements Rpc using the communication protocol defined in
// the msgpack spec at https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md .
// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
//
// See GoRpc documentation, for information on buffering for better performance.
var MsgpackSpecRpc msgpackSpecRpc

func (x msgpackSpecRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
Expand Down
85 changes: 41 additions & 44 deletions codec/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package codec

import (
"bufio"
"errors"
"io"
"net/rpc"
Expand All @@ -16,27 +17,12 @@ type Rpc interface {
ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
}

// // RpcCodecBuffered allows access to the underlying bufio.Reader/Writer
// // used by the rpc connection. It accommodates use-cases where the connection
// // should be used by rpc and non-rpc functions, e.g. streaming a file after
// // sending an rpc response.
// type RpcCodecBuffered interface {
// BufferedReader() *bufio.Reader
// BufferedWriter() *bufio.Writer
// }

// -------------------------------------

type rpcFlusher interface {
Flush() error
}

// rpcCodec defines the struct members and common methods.
type rpcCodec struct {
c io.Closer
r io.Reader
w io.Writer
f rpcFlusher
f ioFlusher

dec *Decoder
enc *Encoder
Expand All @@ -60,7 +46,13 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
}
f, _ := w.(rpcFlusher)
// always ensure that we use a flusher, and always flush what was written to the connection.
// we lose nothing by using a buffered writer internally.
f, ok := w.(ioFlusher)
if !ok {
bw := bufio.NewWriter(w)
f, w = bw, bw
}
return rpcCodec{
c: c,
w: w,
Expand All @@ -72,17 +64,9 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
}
}

// func (c *rpcCodec) BufferedReader() *bufio.Reader {
// return c.br
// }

// func (c *rpcCodec) BufferedWriter() *bufio.Writer {
// return c.bw
// }

func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
if c.isClosed() {
return io.EOF
return c.clsErr
}
if err = c.enc.Encode(obj1); err != nil {
return
Expand All @@ -92,32 +76,37 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e
return
}
}
if doFlush && c.f != nil {
if c.f != nil {
return c.f.Flush()
}
return
}

func (c *rpcCodec) read(obj interface{}) (err error) {
if c.isClosed() {
return io.EOF
return c.clsErr
}
//If nil is passed in, we should still attempt to read content to nowhere.
//If nil is passed in, we should read and discard
if obj == nil {
var obj2 interface{}
return c.dec.Decode(&obj2)
// var obj2 interface{}
// return c.dec.Decode(&obj2)
func() {
defer panicToErr(&err)
c.dec.swallow()
}()
return
}
return c.dec.Decode(obj)
}

func (c *rpcCodec) isClosed() bool {
func (c *rpcCodec) isClosed() (b bool) {
if c.c == nil {
return false
}
c.clsmu.RLock()
x := c.cls
b = c.cls
c.clsmu.RUnlock()
return x
return
}

func (c *rpcCodec) Close() error {
Expand Down Expand Up @@ -156,13 +145,13 @@ func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
// Must protect for concurrent access as per API
c.mu.Lock()
defer c.mu.Unlock()
return c.write(r, body, true, true)
return c.write(r, body, true)
}

func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()
return c.write(r, body, true, true)
return c.write(r, body, true)
}

func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
Expand All @@ -184,13 +173,23 @@ func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
type goRpc struct{}

// GoRpc implements Rpc using the communication protocol defined in net/rpc package.
// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
//
// By default, the conn parameter got from a network is not buffered.
// For performance, considering using a buffered value e.g.
// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
// We will internally use a buffer during writes, for performance, if the non-buffered
// connection is passed in.
//
// However, you may consider explicitly passing in a buffered value e.g.
// var handle codec.Handle // codec handle
// var conn io.ReadWriteCloser // connection got from a socket
// conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
// var h = GoRpc.ServerCodec(conn2, handle)
// var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
// io.Closer
// *bufio.Reader
// *bufio.Writer
// }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
// var serverCodec = GoRpc.ServerCodec(bufconn, handle)
// var clientCodec = GoRpc.ClientCodec(bufconn, handle)
//
// If all you care about is buffered writes, this is done automatically for you.
var GoRpc goRpc

func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
Expand All @@ -200,5 +199,3 @@ func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
return &goRpcCodec{newRPCCodec(conn, h)}
}

// var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered

0 comments on commit f894406

Please sign in to comment.