Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-2828: add debug diagnostics for spaces and connections #314

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions net/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
return pr, nil
}

type Stat struct {
PeerId string `json:"peerId"`
SubConnections int `json:"subConnections"`
Created time.Time `json:"created"`
Version uint32 `json:"version"`
}

type StatProvider interface {
ProvideStat() *Stat
}

type Peer interface {
Id() string
Context() context.Context
Expand Down Expand Up @@ -320,3 +331,16 @@ func (p *peer) Close() (err error) {
log.Debug("peer close", zap.String("peerId", p.id))
return p.MultiConn.Close()
}

func (p *peer) ProvideStat() *Stat {
protoVersion, _ := CtxProtoVersion(p.Context())
p.mu.Lock()
subConnectionsCount := len(p.active)
p.mu.Unlock()
return &Stat{
PeerId: p.id,
SubConnections: subConnectionsCount,
Created: p.created,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a field called AliveTimeSecs which will tell how many seconds it is alive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just write pls p.Lock(), defer p.Unlock() and then return &Stat{...}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you propose to calculate AliveTimeSecs, because don't see such field in peer? 🧐

Version: protoVersion,
}
}
35 changes: 33 additions & 2 deletions net/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.uber.org/zap"

"github.com/anyproto/any-sync/app/debugstat"
"github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/net"
"github.com/anyproto/any-sync/net/peer"
Expand All @@ -26,9 +27,14 @@ type Pool interface {
Pick(ctx context.Context, id string) (pr peer.Peer, err error)
}

type poolStats struct {
PeerStats []*peer.Stat `json:"peerStats"`
}

type pool struct {
outgoing ocache.OCache
incoming ocache.OCache
outgoing ocache.OCache
incoming ocache.OCache
statService debugstat.StatService
}

func (p *pool) Name() (name string) {
Expand Down Expand Up @@ -135,3 +141,28 @@ func (p *pool) pick(ctx context.Context, source ocache.OCache, id string) (peer.
}
return nil, fmt.Errorf("failed to pick connection with peer: peer not found")
}

func (p *pool) ProvideStat() any {
peerStats := make([]*peer.Stat, 0)
p.outgoing.ForEach(func(v ocache.Object) (isContinue bool) {
if p, ok := v.(peer.StatProvider); ok {
peerStats = append(peerStats, p.ProvideStat())
}
return true
})
p.incoming.ForEach(func(v ocache.Object) (isContinue bool) {
if p, ok := v.(peer.StatProvider); ok {
peerStats = append(peerStats, p.ProvideStat())
}
return true
})
return &poolStats{PeerStats: peerStats}
}

func (p *pool) StatId() string {
return CName
}

func (p *pool) StatType() string {
return CName
}
164 changes: 148 additions & 16 deletions net/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPool_Get(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
return newTestPeer("1"), nil
return newTestPeer("1", time.Now(), 0, 0), nil
}
p, err := fx.Get(ctx, "1")
assert.NoError(t, err)
Expand All @@ -49,15 +49,15 @@ func TestPool_Get(t *testing.T) {
t.Run("retry for closed", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
tp := newTestPeer("1")
tp := newTestPeer("1", time.Now(), 0, 0)
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
return tp, nil
}
p, err := fx.Get(ctx, "1")
assert.NoError(t, err)
assert.NotNil(t, p)
p.Close()
tp2 := newTestPeer("1")
tp2 := newTestPeer("1", time.Now(), 0, 0)
fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
return tp2, nil
}
Expand All @@ -80,7 +80,7 @@ func TestPool_GetOneOf(t *testing.T) {
t.Run("from cache", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
tp1 := newTestPeer("1")
tp1 := newTestPeer("1", time.Now(), 0, 0)
addToCache(t, fx, tp1)
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
require.NoError(t, err)
Expand All @@ -89,10 +89,10 @@ func TestPool_GetOneOf(t *testing.T) {
t.Run("from cache - skip closed", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
tp2 := newTestPeer("2")
tp2 := newTestPeer("2", time.Now(), 0, 0)
addToCache(t, fx, tp2)
tp2.Close()
tp1 := newTestPeer("1")
tp1 := newTestPeer("1", time.Now(), 0, 0)
addToCache(t, fx, tp1)
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
require.NoError(t, err)
Expand All @@ -107,7 +107,7 @@ func TestPool_GetOneOf(t *testing.T) {
return nil, fmt.Errorf("not expected call")
}
called = true
return newTestPeer(peerId), nil
return newTestPeer(peerId, time.Now(), 0, 0), nil
}
p, err := fx.GetOneOf(ctx, []string{"3", "2", "1"})
require.NoError(t, err)
Expand Down Expand Up @@ -139,12 +139,12 @@ func TestPool_AddPeer(t *testing.T) {
t.Run("success", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1")))
require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1", time.Now(), 0, 0)))
})
t.Run("two peers", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
p1, p2 := newTestPeer("p1"), newTestPeer("p1")
p1, p2 := newTestPeer("p1", time.Now(), 0, 0), newTestPeer("p1", time.Now(), 0, 0)
require.NoError(t, fx.AddPeer(ctx, p1))
require.NoError(t, fx.AddPeer(ctx, p2))
select {
Expand All @@ -167,7 +167,7 @@ func TestPool_Pick(t *testing.T) {
t.Run("success", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
p1 := newTestPeer("p1")
p1 := newTestPeer("p1", time.Now(), 0, 0)
require.NoError(t, fx.AddPeer(ctx, p1))

p, err := fx.Pick(ctx, "p1")
Expand All @@ -179,7 +179,7 @@ func TestPool_Pick(t *testing.T) {
t.Run("peer is closed", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
p1 := newTestPeer("p1")
p1 := newTestPeer("p1", time.Now(), 0, 0)
require.NoError(t, fx.AddPeer(ctx, p1))
require.NoError(t, p1.Close())
p, err := fx.Pick(ctx, "p1")
Expand All @@ -189,6 +189,123 @@ func TestPool_Pick(t *testing.T) {
})
}

func TestProvideStat_NoPeers(t *testing.T) {
t.Run("only incoming peers", func(t *testing.T) {
// given
fx := newFixture(t)
defer fx.Finish()
created := time.Now()
p1 := newTestPeer("p1", created, 1, 1)
require.NoError(t, fx.AddPeer(ctx, p1))

statProvider, ok := fx.Service.(*poolService)
assert.True(t, ok)

// when
stat := statProvider.ProvideStat()

// then
assert.NotNil(t, stat)
poolStat, ok := stat.(*poolStats)
assert.True(t, ok)

assert.Len(t, poolStat.PeerStats, 1)
assert.Equal(t, p1.id, poolStat.PeerStats[0].PeerId)
assert.Equal(t, p1.created, poolStat.PeerStats[0].Created)
assert.Equal(t, p1.subConnections, poolStat.PeerStats[0].SubConnections)
assert.Equal(t, p1.version, poolStat.PeerStats[0].Version)
})
t.Run("outgoing and incoming peers", func(t *testing.T) {
// given
fx := newFixture(t)
defer fx.Finish()
created := time.Now()
created1 := time.Now()
p1 := newTestPeer("p1", created, 2, 2)
require.NoError(t, fx.AddPeer(ctx, p1))

fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
return newTestPeer(peerId, created1, 0, 0), nil
}

peerId := "p2"
_, err := fx.Get(ctx, peerId)
require.NoError(t, err)

statProvider, ok := fx.Service.(*poolService)
assert.True(t, ok)

// when
stat := statProvider.ProvideStat()

// then
assert.NotNil(t, stat)
poolStat, ok := stat.(*poolStats)
assert.True(t, ok)

assert.Len(t, poolStat.PeerStats, 2)
assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId)
assert.Equal(t, created1, poolStat.PeerStats[0].Created)
assert.Equal(t, 0, poolStat.PeerStats[0].SubConnections)
assert.Equal(t, uint32(0), poolStat.PeerStats[0].Version)

assert.Equal(t, p1.id, poolStat.PeerStats[1].PeerId)
assert.Equal(t, p1.created, poolStat.PeerStats[1].Created)
assert.Equal(t, p1.subConnections, poolStat.PeerStats[1].SubConnections)
assert.Equal(t, p1.version, poolStat.PeerStats[1].Version)
})
t.Run("only outcoming peers", func(t *testing.T) {
// given
peerId := "p1"
fx := newFixture(t)
defer fx.Finish()
created := time.Now()
subConn := 3
version := uint32(2)

fx.Dialer.dial = func(ctx context.Context, peerId string) (peer peer.Peer, err error) {
return newTestPeer(peerId, created, subConn, version), nil
}

_, err := fx.Get(ctx, peerId)
require.NoError(t, err)

statProvider, ok := fx.Service.(*poolService)
assert.True(t, ok)

// when
stat := statProvider.ProvideStat()

// then
assert.NotNil(t, stat)
poolStat, ok := stat.(*poolStats)
assert.True(t, ok)

assert.Len(t, poolStat.PeerStats, 1)
assert.Equal(t, peerId, poolStat.PeerStats[0].PeerId)
assert.Equal(t, created, poolStat.PeerStats[0].Created)
assert.Equal(t, subConn, poolStat.PeerStats[0].SubConnections)
assert.Equal(t, version, poolStat.PeerStats[0].Version)
})
t.Run("no peers", func(t *testing.T) {
// given
fx := newFixture(t)
defer fx.Finish()

statProvider, ok := fx.Service.(*poolService)
assert.True(t, ok)

// when
stat := statProvider.ProvideStat()

// then
assert.NotNil(t, stat)
poolStat, ok := stat.(*poolStats)
assert.True(t, ok)
assert.Len(t, poolStat.PeerStats, 0)
})
}

func newFixture(t *testing.T) *fixture {
fx := &fixture{
Service: New(),
Expand Down Expand Up @@ -240,16 +357,31 @@ func (d *dialerMock) Name() (name string) {
return "net.peerservice"
}

func newTestPeer(id string) *testPeer {
func newTestPeer(id string, created time.Time, subConnections int, version uint32) *testPeer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make another constructor for testPeer with all that stuff, don't change other tests

return &testPeer{
id: id,
closed: make(chan struct{}),
id: id,
closed: make(chan struct{}),
created: created,
subConnections: subConnections,
version: version,
}
}

type testPeer struct {
id string
closed chan struct{}
id string
closed chan struct{}
created time.Time
subConnections int
version uint32
}

func (t *testPeer) ProvideStat() *peer.Stat {
return &peer.Stat{
PeerId: t.id,
Created: t.created,
SubConnections: t.subConnections,
Version: t.version,
}
}

func (t *testPeer) SetTTL(ttl time.Duration) {
Expand Down
8 changes: 8 additions & 0 deletions net/pool/poolservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.uber.org/zap"

"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/debugstat"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/metric"
Expand Down Expand Up @@ -64,6 +65,12 @@ func (p *poolService) Init(a *app.App) (err error) {
ocache.WithTTL(time.Minute),
ocache.WithPrometheus(p.metricReg, "netpool", "incoming"),
)
comp, ok := a.Component(debugstat.CName).(debugstat.StatService)
if !ok {
comp = debugstat.NewNoOp()
}
p.statService = comp
p.statService.AddProvider(p)
return nil
}

Expand All @@ -72,6 +79,7 @@ func (p *pool) Run(ctx context.Context) (err error) {
}

func (p *pool) Close(ctx context.Context) (err error) {
p.statService.RemoveProvider(p)
if e := p.incoming.Close(); e != nil {
log.Warn("close incoming cache error", zap.Error(e))
}
Expand Down
Loading