Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue #127 #1155

Merged
merged 3 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions lib/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -18,11 +19,15 @@ import (
"golang.org/x/xerrors"
)

var log = logging.Logger("rpc")
const (
methodRetryFrequency = time.Second * 3
)

var (
errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem()

log = logging.Logger("rpc")
)

// ErrClient is an error which occurred on the client side the library
Expand Down Expand Up @@ -78,12 +83,21 @@ type client struct {

// NewMergeClient is like NewClient, but allows to specify multiple structs
// to be filled in the same namespace, using one connection
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
connFactory := func() (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
return conn, err
}
conn, err := connFactory()
if err != nil {
return nil, err
}

var config Config
for _, o := range opts {
o(&config)
}

c := client{
namespace: namespace,
}
Expand All @@ -95,11 +109,13 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe

handlers := map[string]rpcHandler{}
go (&wsConn{
conn: conn,
handler: handlers,
requests: c.requests,
stop: stop,
exiting: exiting,
conn: conn,
connFactory: connFactory,
reconnectInterval: config.ReconnectInterval,
handler: handlers,
requests: c.requests,
stop: stop,
exiting: exiting,
}).handleWsConn(context.TODO())

for _, handler := range outs {
Expand Down Expand Up @@ -269,6 +285,8 @@ type rpcFunc struct {

hasCtx int
retCh bool

retry bool
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We save here if was tagged with retry.

}

func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value {
Expand Down Expand Up @@ -344,27 +362,38 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
}
}

resp, err := fn.client.sendRequest(ctx, req, chCtor)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}
var resp clientResponse
var err error
// keep retrying if got a forced closed websocket conn and calling method
// has retry annotation
for {
resp, err = fn.client.sendRequest(ctx, req, chCtor)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}

if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
}
if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
}

if fn.valOut != -1 && !fn.retCh {
val := reflect.New(fn.ftyp.Out(fn.valOut))
if fn.valOut != -1 && !fn.retCh {
val := reflect.New(fn.ftyp.Out(fn.valOut))

if resp.Result != nil {
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
log.Warnw("unmarshaling failed", "message", string(resp.Result))
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
if resp.Result != nil {
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
log.Warnw("unmarshaling failed", "message", string(resp.Result))
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
}
}
}

retVal = func() reflect.Value { return val.Elem() }
retVal = func() reflect.Value { return val.Elem() }
}
retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry
if !retry {
break
}
time.Sleep(methodRetryFrequency)
}

return fn.processResponse(resp, retVal())
Expand All @@ -380,6 +409,7 @@ func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
client: c,
ftyp: ftyp,
name: f.Name,
retry: f.Tag.Get("retry") == "true",
}
fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp)

Expand Down
19 changes: 19 additions & 0 deletions lib/jsonrpc/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package jsonrpc

import "time"

type Config struct {
ReconnectInterval time.Duration
}

var defaultConfig = Config{
ReconnectInterval: time.Second * 5,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary...

}

type Option func(c *Config)

func WithReconnectInterval(d time.Duration) func(c *Config) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for some reason wants to change.

return func(c *Config) {
c.ReconnectInterval = d
}
}
97 changes: 74 additions & 23 deletions lib/jsonrpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -42,11 +43,13 @@ type outChanReg struct {

type wsConn struct {
// outside params
conn *websocket.Conn
handler handlers
requests <-chan clientRequest
stop <-chan struct{}
exiting chan struct{}
conn *websocket.Conn
connFactory func() (*websocket.Conn, error)
reconnectInterval time.Duration
handler handlers
requests <-chan clientRequest
stop <-chan struct{}
exiting chan struct{}

// incoming messages
incoming chan io.Reader
Expand Down Expand Up @@ -419,6 +422,35 @@ func (c *wsConn) handleFrame(ctx context.Context, frame frame) {
}
}

func (c *wsConn) closeInFlight() {
for id, req := range c.inflight {
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: id,
Error: &respError{
Message: "handler: websocket connection closed",
jsign marked this conversation as resolved.
Show resolved Hide resolved
Code: 2,
},
}

c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
}
c.inflight = map[int64]clientRequest{}
c.handling = map[int64]context.CancelFunc{}
}

func (c *wsConn) closeChans() {
for chid := range c.chanHandlers {
hnd := c.chanHandlers[chid]
delete(c.chanHandlers, chid)
hnd(nil, false)
}
}

func (c *wsConn) handleWsConn(ctx context.Context) {
c.incoming = make(chan io.Reader)
c.inflight = map[int64]clientRequest{}
Expand All @@ -432,34 +464,39 @@ func (c *wsConn) handleWsConn(ctx context.Context) {

// on close, make sure to return from all pending calls, and cancel context
// on all calls we handle
defer func() {
for id, req := range c.inflight {
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: id,
Error: &respError{
Message: "handler: websocket connection closed",
},
}

c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
}
}()
defer c.closeInFlight()

// wait for the first message
go c.nextMessage()

for {
select {
case r, ok := <-c.incoming:
if !ok {
if c.incomingErr != nil {
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
log.Debugw("websocket error", "error", c.incomingErr)
// connection dropped unexpectedly, do our best to recover it
c.closeInFlight()
c.closeChans()
c.incoming = make(chan io.Reader) // listen again for responses
jsign marked this conversation as resolved.
Show resolved Hide resolved
go func() {
var conn *websocket.Conn
for conn == nil {
time.Sleep(c.reconnectInterval)
var err error
if conn, err = c.connFactory(); err != nil {
log.Debugw("websocket connection retried failed", "error", err)
}
}

c.writeLk.Lock()
c.conn = conn
c.incomingErr = nil
c.writeLk.Unlock()

go c.nextMessage()
}()
continue
}
}
return // remote closed
Expand All @@ -477,9 +514,23 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.handleFrame(ctx, frame)
go c.nextMessage()
case req := <-c.requests:
c.writeLk.Lock()
if req.req.ID != nil {
if c.incomingErr != nil { // No conn?, immediate fail
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, whenever gorilla detects the connection is closed, it closes c.incoming. Didn't expect that but found it while making some tests. So we never assume after reading <-c.requests that we have a good conn. c.incomingErr is set again to nil when conn is reestablished (line 490)

req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: *req.req.ID,
Error: &respError{
Message: "handler: websocket connection closed",
Code: 2,
},
}
c.writeLk.Unlock()
break
}
c.inflight[*req.req.ID] = req
}
c.writeLk.Unlock()
c.sendRequest(req.req)
case <-c.stop:
c.writeLk.Lock()
Expand Down