Skip to content

Commit

Permalink
Merge branch 'feat/stat'
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Aug 28, 2018
2 parents 5a9655c + 3f9733a commit dae9dd1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 6 deletions.
4 changes: 3 additions & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}

func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
func (s *Swarm) addConn(tc transport.Conn, dir inet.Direction) (*Conn, error) {
// The underlying transport (or the dialer) *should* filter it's own
// connections but we should double check anyways.
raddr := tc.RemoteMultiaddr()
Expand Down Expand Up @@ -194,9 +194,11 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
}

// Wrap and register the connection.
stat := inet.Stat{Direction: dir}
c := &Conn{
conn: tc,
swarm: s,
stat: stat,
}
c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
Expand Down
15 changes: 12 additions & 3 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Conn struct {
sync.Mutex
m map[*Stream]struct{}
}

stat inet.Stat
}

// Close closes this connection.
Expand Down Expand Up @@ -98,7 +100,7 @@ func (c *Conn) start() {
}
c.swarm.refs.Add(1)
go func() {
s, err := c.addStream(ts)
s, err := c.addStream(ts, inet.DirInbound)

// Don't defer this. We don't want to block
// swarm shutdown on the connection handler.
Expand Down Expand Up @@ -158,16 +160,21 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
return c.conn.RemotePublicKey()
}

// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() inet.Stat {
return c.stat
}

// NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) {
ts, err := c.conn.OpenStream()
if err != nil {
return nil, err
}
return c.addStream(ts)
return c.addStream(ts, inet.DirOutbound)
}

func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
func (c *Conn) addStream(ts smux.Stream, dir inet.Direction) (*Stream, error) {
c.streams.Lock()
// Are we still online?
if c.streams.m == nil {
Expand All @@ -177,9 +184,11 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
}

// Wrap and register the stream.
stat := inet.Stat{Direction: dir}
s := &Stream{
stream: ts,
conn: c,
stat: stat,
}
c.streams.m[s] = struct{}{}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
"localAddr": connC.LocalMultiaddr(),
"remoteAddr": connC.RemoteMultiaddr(),
}
swarmC, err := s.addConn(connC)
swarmC, err := s.addConn(connC, inet.DirOutbound)
if err != nil {
logdial["error"] = err.Error()
connC.Close() // close the connection. didn't work out :(
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
s.refs.Add(1)
go func() {
defer s.refs.Done()
_, err := s.addConn(c)
_, err := s.addConn(c, inet.DirInbound)
if err != nil {
// Probably just means that the swarm has been closed.
log.Warningf("add conn failed: ", err)
Expand Down
10 changes: 10 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (
streamReset
)

// Validate Stream conforms to the go-libp2p-net Stream interface
var _ inet.Stream = &Stream{}

// Stream is the stream type used by swarm. In general, you won't use this type
// directly.
type Stream struct {
Expand All @@ -36,6 +39,8 @@ type Stream struct {
notifyLk sync.Mutex

protocol atomic.Value

stat inet.Stat
}

func (s *Stream) String() string {
Expand Down Expand Up @@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error {
func (s *Stream) SetWriteDeadline(t time.Time) error {
return s.stream.SetWriteDeadline(t)
}

// Stat returns metadata information for this stream.
func (s *Stream) Stat() inet.Stat {
return s.stat
}

0 comments on commit dae9dd1

Please sign in to comment.