Skip to content

Commit

Permalink
Set gRPC dial-options (#427)
Browse files Browse the repository at this point in the history
* Split timeouts for dial/push/pull operations

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Set internal gRPC-parameters with NetOption

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Rename options

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Fix error wrapping

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
  • Loading branch information
dgtony authored Aug 27, 2020
1 parent 0a6a7f4 commit ccf9eb0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 27 deletions.
24 changes: 16 additions & 8 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
api, err := net.NewNetwork(ctx, h, lite.BlockStore(), lite, tstore, net.Config{
Debug: config.Debug,
PubSub: config.PubSub,
}, config.GRPCOptions...)
}, config.GRPCServerOptions, config.GRPCDialOptions)
if err != nil {
cancel()
if err := logstore.Close(); err != nil {
Expand All @@ -144,11 +144,12 @@ func DefaultNetwork(repoPath string, opts ...NetOption) (NetBoostrapper, error)
}

type NetConfig struct {
HostAddr ma.Multiaddr
ConnManager cconnmgr.ConnManager
Debug bool
GRPCOptions []grpc.ServerOption
PubSub bool
HostAddr ma.Multiaddr
ConnManager cconnmgr.ConnManager
Debug bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
PubSub bool
}

type NetOption func(c *NetConfig) error
Expand All @@ -174,9 +175,16 @@ func WithNetDebug(enabled bool) NetOption {
}
}

func WithNetGRPCOptions(opts ...grpc.ServerOption) NetOption {
func WithNetGRPCServerOptions(opts ...grpc.ServerOption) NetOption {
return func(c *NetConfig) error {
c.GRPCOptions = opts
c.GRPCServerOptions = opts
return nil
}
}

func WithNetGRPCDialOptions(opts ...grpc.DialOption) NetOption {
return func(c *NetConfig) error {
c.GRPCDialOptions = opts
return nil
}
}
Expand Down
28 changes: 18 additions & 10 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"google.golang.org/grpc/connectivity"
)

const (
var (
// DialTimeout is the max time duration to wait when dialing a peer.
DialTimeout = time.Second * 10
PushTimeout = time.Second * 10
PullTimeout = time.Second * 10
)

// getLogs in a thread.
Expand Down Expand Up @@ -63,7 +65,7 @@ func (s *server) getLogs(ctx context.Context, id thread.ID, pid peer.ID) ([]thre
if err != nil {
return nil, err
}
cctx, cancel := context.WithTimeout(ctx, DialTimeout)
cctx, cancel := context.WithTimeout(ctx, PullTimeout)
defer cancel()
reply, err := client.GetLogs(cctx, req)
if err != nil {
Expand Down Expand Up @@ -109,13 +111,13 @@ func (s *server) pushLog(ctx context.Context, id thread.ID, lg thread.LogInfo, p

client, err := s.dial(pid)
if err != nil {
return fmt.Errorf("dial %s failed: %s", pid, err)
return fmt.Errorf("dial %s failed: %w", pid, err)
}
cctx, cancel := context.WithTimeout(ctx, DialTimeout)
cctx, cancel := context.WithTimeout(ctx, PushTimeout)
defer cancel()
_, err = client.PushLog(cctx, lreq)
if err != nil {
return fmt.Errorf("push log to %s failed: %s", pid, err)
return fmt.Errorf("push log to %s failed: %w", pid, err)
}
return err
}
Expand Down Expand Up @@ -232,7 +234,7 @@ func (s *server) getRecords(ctx context.Context, id thread.ID, lid peer.ID, offs
log.Errorf("dial %s failed: %s", p, err)
return
}
cctx, cancel := context.WithTimeout(ctx, DialTimeout)
cctx, cancel := context.WithTimeout(ctx, PullTimeout)
defer cancel()
reply, err := client.GetRecords(cctx, req)
if err != nil {
Expand Down Expand Up @@ -337,7 +339,7 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
log.Errorf("dial %s failed: %s", p, err)
return
}
cctx, cancel := context.WithTimeout(context.Background(), DialTimeout)
cctx, cancel := context.WithTimeout(context.Background(), PushTimeout)
defer cancel()
if _, err = client.PushRecord(cctx, req); err != nil {
if status.Convert(err).Code() == codes.NotFound { // Send the missing log
Expand Down Expand Up @@ -402,7 +404,7 @@ func (s *server) dial(peerID peer.ID) (pb.ServiceClient, error) {
}
ctx, cancel := context.WithTimeout(context.Background(), DialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, peerID.Pretty(), s.getLibp2pDialer(), grpc.WithInsecure())
conn, err := grpc.DialContext(ctx, peerID.Pretty(), s.opts...)
if err != nil {
return nil, err
}
Expand All @@ -415,9 +417,15 @@ func (s *server) getLibp2pDialer() grpc.DialOption {
return grpc.WithContextDialer(func(ctx context.Context, peerIDStr string) (nnet.Conn, error) {
id, err := peer.Decode(peerIDStr)
if err != nil {
return nil, fmt.Errorf("grpc tried to dial non peerID: %s", err)
return nil, fmt.Errorf("grpc tried to dial non peerID: %w", err)
}
return gostream.Dial(ctx, s.net.host, id, thread.Protocol)

conn, err := gostream.Dial(ctx, s.net.host, id, thread.Protocol)
if err != nil {
return nil, fmt.Errorf("gostream dial failed: %w", err)
}

return conn, nil
})
}

Expand Down
16 changes: 13 additions & 3 deletions net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ type Config struct {
}

// NewNetwork creates an instance of net from the given host and thread store.
func NewNetwork(ctx context.Context, h host.Host, bstore bs.Blockstore, ds format.DAGService, ls lstore.Logstore, conf Config, opts ...grpc.ServerOption) (app.Net, error) {
func NewNetwork(
ctx context.Context,
h host.Host,
bstore bs.Blockstore,
ds format.DAGService,
ls lstore.Logstore,
conf Config,
serverOptions []grpc.ServerOption,
dialOptions []grpc.DialOption,
) (app.Net, error) {
var err error
if conf.Debug {
if err = util.SetLogLevels(map[string]logging.LogLevel{
Expand All @@ -99,14 +108,15 @@ func NewNetwork(ctx context.Context, h host.Host, bstore bs.Blockstore, ds forma
host: h,
bstore: bstore,
store: ls,
rpc: grpc.NewServer(opts...),
rpc: grpc.NewServer(serverOptions...),
bus: broadcast.NewBroadcaster(0),
connectors: make(map[thread.ID]*app.Connector),
ctx: ctx,
cancel: cancel,
pullLocks: make(map[thread.ID]chan struct{}),
}
t.server, err = newServer(t, conf.PubSub)

t.server, err = newServer(t, conf.PubSub, dialOptions...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion net/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func makeNetwork(t *testing.T) core.Net {
Config{
Debug: true,
PubSub: true,
})
}, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 15 additions & 5 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,25 @@ type server struct {
sync.Mutex
net *net
ps *PubSub
opts []grpc.DialOption
conns map[peer.ID]*grpc.ClientConn
}

// newServer creates a new network server.
func newServer(n *net, enablePubSub bool) (*server, error) {
s := &server{
net: n,
conns: make(map[peer.ID]*grpc.ClientConn),
}
func newServer(n *net, enablePubSub bool, opts ...grpc.DialOption) (*server, error) {
var (
s = &server{
net: n,
conns: make(map[peer.ID]*grpc.ClientConn),
}

defaultOpts = []grpc.DialOption{
s.getLibp2pDialer(),
grpc.WithInsecure(),
}
)

s.opts = append(defaultOpts, opts...)

if enablePubSub {
ps, err := pubsub.NewGossipSub(
Expand Down

0 comments on commit ccf9eb0

Please sign in to comment.