Skip to content

Commit

Permalink
multiplexer: remove Ping() interface;
Browse files Browse the repository at this point in the history
server/gossip: use gossip to determine peer latency
  • Loading branch information
zllovesuki committed Sep 19, 2021
1 parent 4462c30 commit 1936840
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 24 deletions.
3 changes: 0 additions & 3 deletions multiplexer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package multiplexer
import (
"context"
"net"
"time"

"github.com/zllovesuki/t/multiplexer/protocol"
)
Expand All @@ -28,8 +27,6 @@ type Peer interface {
Protocol() protocol.Protocol
// Peer returns the uint64 identifier of the connected Peer
Peer() uint64
// Ping is useful for checking latency and health
Ping() (time.Duration, error)
// Messaging opens a dedicated bidirectional stream to handle in-band control messages
Messaging(context.Context) (net.Conn, error)
// Bidirectional establishs a virtual link between the Source and Destination via this Peer
Expand Down
6 changes: 0 additions & 6 deletions mux/mplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"io"
"net"
"time"

"github.com/zllovesuki/t/multiplexer"
"github.com/zllovesuki/t/multiplexer/protocol"
Expand Down Expand Up @@ -137,11 +136,6 @@ func (p *Mplex) Peer() uint64 {
return p.config.Peer
}

func (p *Mplex) Ping() (time.Duration, error) {
// TODO(zllovesuki): fill this stub
return 0, nil
}

func (p *Mplex) Messaging(ctx context.Context) (net.Conn, error) {
n, err := p.session.NewStream(ctx)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions mux/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,6 @@ func (p *QUIC) Peer() uint64 {
return p.config.Peer
}

func (p *QUIC) Ping() (time.Duration, error) {
// TODO(zllovesuki): fill this stub
return 0, nil
}

func (p *QUIC) Messaging(ctx context.Context) (net.Conn, error) {
n, err := p.session.OpenStream()
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions mux/yamux.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ func (p *Yamux) Peer() uint64 {
return p.config.Peer
}

func (p *Yamux) Ping() (time.Duration, error) {
return p.session.Ping()
}

func (p *Yamux) Messaging(ctx context.Context) (net.Conn, error) {
n, err := p.session.Open(ctx)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions server/gossip.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -201,6 +202,26 @@ func (s *Server) GetBroadcasts(overhead, limit int) [][]byte {
return s.broadcasts.GetBroadcasts(overhead, limit)
}

// ======== Ping ========

var _ memberlist.PingDelegate = &Server{}

func (s *Server) AckPayload() []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(s.peers.Len()))
return b
}

func (s *Server) NotifyPingComplete(other *memberlist.Node, rtt time.Duration, payload []byte) {
var m Meta
if err := m.UnmarshalBinary(other.Meta); err != nil {
s.logger.Error("unmarshal node meta from ping", zap.Error(err))
return
}
n := binary.BigEndian.Uint64(payload)
s.logger.Info("ping", zap.Uint64("Peer", m.PeerID), zap.Duration("rtt", rtt), zap.Uint64("numPeers", n))
}

// ======== Gossip Helpers ========

func (s *Server) connectPeer(m Meta) {
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func New(conf Config) (*Server, error) {
c.PushPullInterval = time.Second * 30 // faster convergence
c.Events = s
c.Delegate = s
c.Ping = s
c.Name = fmt.Sprintf("%s:%d/%d", conf.Network.AdvertiseAddr, peerPort, self)
c.LogOutput = io.Discard

Expand Down
7 changes: 1 addition & 6 deletions state/peer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,13 @@ func (s *PeerMap) NewPeer(ctx context.Context, proto protocol.Protocol, conf mul
return err
}

d, err := p.Ping()
if err != nil {
return errors.Wrap(err, "cannot ping peer")
}

select {
case <-p.NotifyClose():
return errors.New("peer closed after negotiation")
case <-time.After(conf.Wait):
}

s.logger.Debug("Peer negotiation result", zap.Uint64("peerID", conf.Peer), zap.Duration("rtt", d), zap.String("protocol", proto.String()))
s.logger.Debug("Peer negotiation result", zap.Uint64("peerID", conf.Peer), zap.String("protocol", proto.String()))

s.peers.Store(conf.Peer, p)
atomic.AddUint64(s.num, 1)
Expand Down

0 comments on commit 1936840

Please sign in to comment.