diff --git a/codec/codec_test.go b/codec/codec_test.go index 24e688fe..1d7d84f5 100644 --- a/codec/codec_test.go +++ b/codec/codec_test.go @@ -4,6 +4,7 @@ package codec import ( + "bufio" "bytes" "encoding/gob" "fmt" @@ -579,7 +580,11 @@ func testReadWriteCloser(c io.ReadWriteCloser) io.ReadWriteCloser { if testRpcBufsize <= 0 && rand.Int63()%2 == 0 { return c } - return NewReadWriteCloser(c, c, testRpcBufsize, testRpcBufsize) + return struct { + io.Closer + *bufio.Reader + *bufio.Writer + }{c, bufio.NewReaderSize(c, testRpcBufsize), bufio.NewWriterSize(c, testRpcBufsize)} } // doTestCodecTableOne allows us test for different variations based on arguments passed. diff --git a/codec/helper.go b/codec/helper.go index eb643245..cfa6541a 100644 --- a/codec/helper.go +++ b/codec/helper.go @@ -97,13 +97,11 @@ package codec // check for these error conditions. import ( - "bufio" "bytes" "encoding" "encoding/binary" "errors" "fmt" - "io" "math" "os" "reflect" @@ -604,11 +602,11 @@ type extTypeTagFn struct { type extHandle []extTypeTagFn -// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead. -// // AddExt registes an encode and decode function for a reflect.Type. // AddExt internally calls SetExt. // To deregister an Ext, call AddExt with nil encfn and/or nil decfn. +// +// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead. func (o *extHandle) AddExt( rt reflect.Type, tag byte, encfn func(reflect.Value) ([]byte, error), decfn func(reflect.Value, []byte) error, @@ -619,12 +617,11 @@ func (o *extHandle) AddExt( return o.SetExt(rt, uint64(tag), addExtWrapper{encfn, decfn}) } -// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead. -// // Note that the type must be a named type, and specifically not // a pointer or Interface. An error is returned if that is not honored. +// To Deregister an ext, call SetExt with nil Ext. // -// To Deregister an ext, call SetExt with nil Ext +// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead. func (o *extHandle) SetExt(rt reflect.Type, tag uint64, ext Ext) (err error) { // o is a pointer, because we may need to initialize it if rt.PkgPath() == "" || rt.Kind() == reflect.Interface { @@ -1586,89 +1583,6 @@ type ioFlusher interface { Flush() error } -// ReadWriteCloser wraps a Reader and Writer to return -// a possibly buffered io.ReadWriteCloser implementation. -type ReadWriteCloser struct { - r io.Reader - w io.Writer - - br *bufio.Reader - bw *bufio.Writer - rc io.Closer - wc io.Closer - - f ioFlusher -} - -func (x *ReadWriteCloser) Reader() io.Reader { - if x.br != nil { - return x.br - } - return x.r -} - -func (x *ReadWriteCloser) Writer() io.Writer { - if x.bw != nil { - return x.bw - } - return x.w -} - -func (x *ReadWriteCloser) Read(p []byte) (n int, err error) { - if x.br != nil { - return x.br.Read(p) - } - return x.r.Read(p) -} - -func (x *ReadWriteCloser) Write(p []byte) (n int, err error) { - if x.bw != nil { - return x.bw.Write(p) - } - return x.w.Write(p) -} - -func (x *ReadWriteCloser) Close() (err error) { - err = x.Flush() - if x.rc != nil { - err = x.rc.Close() - } - if x.wc != nil { - err = x.wc.Close() - } - return err -} - -func (x *ReadWriteCloser) Flush() (err error) { - if x.bw != nil { - err = x.bw.Flush() - } - if x.f != nil { - err = x.f.Flush() - } - return err -} - -// --------- - -// ReadWriteCloser returns a (possibly buffered) value wrapping -// the Reader and Writer and with an appropriate method for the close. -// -// Use it in contexts (e.g. rpc) where you need to get a buffered wrapper -func NewReadWriteCloser(r io.Reader, w io.Writer, rbufsize, wbufsize int) (x *ReadWriteCloser) { - x = &ReadWriteCloser{r: r, w: w} - if rbufsize > 0 { - x.br = bufio.NewReaderSize(x.r, rbufsize) - } - if wbufsize > 0 { - x.bw = bufio.NewWriterSize(x.w, wbufsize) - } - x.rc, _ = r.(io.Closer) - x.wc, _ = w.(io.Closer) - x.f, _ = w.(ioFlusher) - return x -} - // ----------------------- type intSlice []int64 diff --git a/codec/msgpack.go b/codec/msgpack.go index 7e8b9967..b9789949 100644 --- a/codec/msgpack.go +++ b/codec/msgpack.go @@ -15,8 +15,8 @@ For compatibility with behaviour of msgpack-c reference implementation: - Go intX (<0) IS ENCODED AS msgpack -ve fixnum, signed - */ + package codec import ( @@ -827,7 +827,7 @@ func (c *msgpackSpecRpcCodec) WriteRequest(r *rpc.Request, body interface{}) err bodyArr = []interface{}{body} } r2 := []interface{}{0, uint32(r.Seq), r.ServiceMethod, bodyArr} - return c.write(r2, nil, false, true) + return c.write(r2, nil, false) } func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error { @@ -839,7 +839,7 @@ func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) e body = nil } r2 := []interface{}{1, uint32(r.Seq), moe, body} - return c.write(r2, nil, false, true) + return c.write(r2, nil, false) } func (c *msgpackSpecRpcCodec) ReadResponseHeader(r *rpc.Response) error { @@ -914,7 +914,8 @@ type msgpackSpecRpc struct{} // MsgpackSpecRpc implements Rpc using the communication protocol defined in // the msgpack spec at https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md . -// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered. +// +// See GoRpc documentation, for information on buffering for better performance. var MsgpackSpecRpc msgpackSpecRpc func (x msgpackSpecRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec { diff --git a/codec/rpc.go b/codec/rpc.go index 87bcd98c..6098800b 100644 --- a/codec/rpc.go +++ b/codec/rpc.go @@ -4,6 +4,7 @@ package codec import ( + "bufio" "errors" "io" "net/rpc" @@ -16,27 +17,12 @@ type Rpc interface { ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec } -// // RpcCodecBuffered allows access to the underlying bufio.Reader/Writer -// // used by the rpc connection. It accommodates use-cases where the connection -// // should be used by rpc and non-rpc functions, e.g. streaming a file after -// // sending an rpc response. -// type RpcCodecBuffered interface { -// BufferedReader() *bufio.Reader -// BufferedWriter() *bufio.Writer -// } - -// ------------------------------------- - -type rpcFlusher interface { - Flush() error -} - // rpcCodec defines the struct members and common methods. type rpcCodec struct { c io.Closer r io.Reader w io.Writer - f rpcFlusher + f ioFlusher dec *Decoder enc *Encoder @@ -60,7 +46,13 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec { if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace { panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true")) } - f, _ := w.(rpcFlusher) + // always ensure that we use a flusher, and always flush what was written to the connection. + // we lose nothing by using a buffered writer internally. + f, ok := w.(ioFlusher) + if !ok { + bw := bufio.NewWriter(w) + f, w = bw, bw + } return rpcCodec{ c: c, w: w, @@ -72,17 +64,9 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec { } } -// func (c *rpcCodec) BufferedReader() *bufio.Reader { -// return c.br -// } - -// func (c *rpcCodec) BufferedWriter() *bufio.Writer { -// return c.bw -// } - -func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) { +func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) { if c.isClosed() { - return io.EOF + return c.clsErr } if err = c.enc.Encode(obj1); err != nil { return @@ -92,7 +76,7 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e return } } - if doFlush && c.f != nil { + if c.f != nil { return c.f.Flush() } return @@ -100,24 +84,29 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e func (c *rpcCodec) read(obj interface{}) (err error) { if c.isClosed() { - return io.EOF + return c.clsErr } - //If nil is passed in, we should still attempt to read content to nowhere. + //If nil is passed in, we should read and discard if obj == nil { - var obj2 interface{} - return c.dec.Decode(&obj2) + // var obj2 interface{} + // return c.dec.Decode(&obj2) + func() { + defer panicToErr(&err) + c.dec.swallow() + }() + return } return c.dec.Decode(obj) } -func (c *rpcCodec) isClosed() bool { +func (c *rpcCodec) isClosed() (b bool) { if c.c == nil { return false } c.clsmu.RLock() - x := c.cls + b = c.cls c.clsmu.RUnlock() - return x + return } func (c *rpcCodec) Close() error { @@ -156,13 +145,13 @@ func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error { // Must protect for concurrent access as per API c.mu.Lock() defer c.mu.Unlock() - return c.write(r, body, true, true) + return c.write(r, body, true) } func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error { c.mu.Lock() defer c.mu.Unlock() - return c.write(r, body, true, true) + return c.write(r, body, true) } func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error { @@ -184,13 +173,23 @@ func (c *goRpcCodec) ReadRequestBody(body interface{}) error { type goRpc struct{} // GoRpc implements Rpc using the communication protocol defined in net/rpc package. -// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered. // -// By default, the conn parameter got from a network is not buffered. -// For performance, considering using a buffered value e.g. +// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered. +// We will internally use a buffer during writes, for performance, if the non-buffered +// connection is passed in. +// +// However, you may consider explicitly passing in a buffered value e.g. +// var handle codec.Handle // codec handle // var conn io.ReadWriteCloser // connection got from a socket -// conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer -// var h = GoRpc.ServerCodec(conn2, handle) +// var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser +// io.Closer +// *bufio.Reader +// *bufio.Writer +// }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)} +// var serverCodec = GoRpc.ServerCodec(bufconn, handle) +// var clientCodec = GoRpc.ClientCodec(bufconn, handle) +// +// If all you care about is buffered writes, this is done automatically for you. var GoRpc goRpc func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec { @@ -200,5 +199,3 @@ func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec { func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec { return &goRpcCodec{newRPCCodec(conn, h)} } - -// var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered