Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats! #246

Merged
merged 2 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"sync"

"github.com/ipfs/go-graphsync"
pq "github.com/ipfs/go-ipfs-pq"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -12,8 +13,8 @@ import (
var log = logging.Logger("graphsync_allocator")

type Allocator struct {
totalMemoryMax uint64
perPeerMax uint64
maxAllowedAllocatedTotal uint64
maxAllowedAllocatedPerPeer uint64

allocLk sync.RWMutex
totalAllocatedAllPeers uint64
Expand All @@ -22,13 +23,13 @@ type Allocator struct {
peerStatusQueue pq.PQ
}

func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator {
func NewAllocator(maxAllowedAllocatedTotal uint64, maxAllowedAllocatedPerPeer uint64) *Allocator {
return &Allocator{
totalMemoryMax: totalMemoryMax,
perPeerMax: perPeerMax,
totalAllocatedAllPeers: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)),
maxAllowedAllocatedTotal: maxAllowedAllocatedTotal,
maxAllowedAllocatedPerPeer: maxAllowedAllocatedPerPeer,
totalAllocatedAllPeers: 0,
peerStatuses: make(map[peer.ID]*peerStatus),
peerStatusQueue: pq.New(makePeerStatusCompare(maxAllowedAllocatedPerPeer)),
}
}

Expand Down Expand Up @@ -58,13 +59,13 @@ func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error {
a.peerStatuses[p] = status
}

if (a.totalAllocatedAllPeers+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 {
if (a.totalAllocatedAllPeers+amount <= a.maxAllowedAllocatedTotal) && (status.totalAllocated+amount <= a.maxAllowedAllocatedPerPeer) && len(status.pendingAllocations) == 0 {
a.totalAllocatedAllPeers += amount
status.totalAllocated += amount
log.Debugw("bytes allocated", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers)
responseChan <- nil
} else {
log.Debugw("byte allocation deferred pending memory release", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax)
log.Debugw("byte allocation deferred pending memory release", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal)
pendingAllocation := pendingAllocation{p, amount, responseChan, a.nextAllocIndex}
a.nextAllocIndex++
status.pendingAllocations = append(status.pendingAllocations, pendingAllocation)
Expand All @@ -91,7 +92,7 @@ func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error {
} else {
a.totalAllocatedAllPeers = 0
}
log.Debugw("memory released", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax)
log.Debugw("memory released", "amount", amount, "peer", p, "peer total", status.totalAllocated, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal)
a.peerStatusQueue.Update(status.Index())
a.processPendingAllocations()
return nil
Expand All @@ -110,7 +111,7 @@ func (a *Allocator) ReleasePeerMemory(p peer.ID) error {
pendingAllocation.response <- errors.New("peer has been deallocated")
}
a.totalAllocatedAllPeers -= status.totalAllocated
log.Debugw("memory released", "amount", status.totalAllocated, "peer", p, "peer total", 0, "global total", a.totalAllocatedAllPeers, "max per peer", a.perPeerMax, "global max", a.totalMemoryMax)
log.Debugw("memory released", "amount", status.totalAllocated, "peer", p, "peer total", 0, "global total", a.totalAllocatedAllPeers, "max per peer", a.maxAllowedAllocatedPerPeer, "global max", a.maxAllowedAllocatedTotal)
a.processPendingAllocations()
return nil
}
Expand All @@ -137,10 +138,10 @@ func (a *Allocator) processPendingAllocations() {

func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool {
pendingAllocation := nextPeer.pendingAllocations[0]
if a.totalAllocatedAllPeers+pendingAllocation.amount > a.totalMemoryMax {
if a.totalAllocatedAllPeers+pendingAllocation.amount > a.maxAllowedAllocatedTotal {
return false
}
if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax {
if nextPeer.totalAllocated+pendingAllocation.amount > a.maxAllowedAllocatedPerPeer {
return false
}
a.totalAllocatedAllPeers += pendingAllocation.amount
Expand All @@ -151,6 +152,31 @@ func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bo
return true
}

func (a *Allocator) Stats() graphsync.ResponseStats {
a.allocLk.RLock()
defer a.allocLk.RUnlock()

numPeersWithPendingAllocations := uint64(0)
totalPendingAllocations := uint64(0)
for _, status := range a.peerStatuses {
peerPendingAllocations := uint64(0)
for _, pa := range status.pendingAllocations {
peerPendingAllocations += pa.amount
}
if peerPendingAllocations > 0 {
numPeersWithPendingAllocations++
totalPendingAllocations += peerPendingAllocations
}
}
return graphsync.ResponseStats{
MaxAllowedAllocatedTotal: a.maxAllowedAllocatedTotal,
MaxAllowedAllocatedPerPeer: a.maxAllowedAllocatedPerPeer,
TotalAllocatedAllPeers: a.totalAllocatedAllPeers,
TotalPendingAllocations: totalPendingAllocations,
NumPeersWithPendingAllocations: numPeersWithPendingAllocations,
}
}

type peerStatus struct {
p peer.ID
totalAllocated uint64
Expand Down
17 changes: 17 additions & 0 deletions allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,23 @@ func TestAllocator(t *testing.T) {
pendingResults = append(pendingResults, next.pendingResult)
}
require.Equal(t, step.expectedPending, pendingResults)
expectedTotalPending := uint64(0)
expectedPeersPending := map[peer.ID]struct{}{}
for _, pendingResult := range step.expectedPending {
expectedTotalPending += pendingResult.amount
expectedPeersPending[pendingResult.p] = struct{}{}
}
expectedNumPeersPending := uint64(len(expectedPeersPending))
expendingTotalAllocated := uint64(0)
for _, peerTotal := range step.totals {
expendingTotalAllocated += peerTotal
}
stats := allocator.Stats()
require.Equal(t, data.total, stats.MaxAllowedAllocatedTotal)
require.Equal(t, data.maxPerPeer, stats.MaxAllowedAllocatedPerPeer)
require.Equal(t, expectedNumPeersPending, stats.NumPeersWithPendingAllocations)
require.Equal(t, expendingTotalAllocated, stats.TotalAllocatedAllPeers)
require.Equal(t, expectedTotalPending, stats.TotalPendingAllocations)
}
})
}
Expand Down
44 changes: 44 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,47 @@ type OnRequestorCancelledListener func(p peer.ID, request RequestData)
// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

// RequestStats offer statistics about request processing
type RequestStats struct {
// TotalPeers is the number of peers that have active or pending requests
TotalPeers uint64
// Active is the total number of active requests being processing
Active uint64
// Pending is the total number of requests that are waiting to be processed
Pending uint64
}

// ResponseStats offer statistics about memory allocations for responses
type ResponseStats struct {
// MaxAllowedAllocatedTotal is the preconfigured limit on allocations
// for all peers
MaxAllowedAllocatedTotal uint64
// MaxAllowedAllocatedPerPeer is the preconfigured limit on allocations
// for an individual peer
MaxAllowedAllocatedPerPeer uint64
// TotalAllocatedAllPeers indicates the amount of memory allocated for blocks
// across all peers
TotalAllocatedAllPeers uint64
// TotalPendingAllocations indicates the amount awaiting freeing up of memory
TotalPendingAllocations uint64
// NumPeersWithPendingAllocations indicates the number of peers that
// have either maxed out their individual memory allocations or have
// pending allocations cause the total limit has been reached.
NumPeersWithPendingAllocations uint64
}

// Stats describes statistics about the Graphsync implementations
// current state
type Stats struct {
// Stats for the graphsync requestor
OutgoingRequests RequestStats
IncomingResponses ResponseStats

// Stats for the graphsync responder
IncomingRequests RequestStats
OutgoingResponses ResponseStats
}

// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
// Request initiates a new GraphSync request to the given peer using the given selector spec.
Expand Down Expand Up @@ -354,4 +395,7 @@ type GraphExchange interface {

// CancelRequest cancels an in progress request
CancelRequest(context.Context, RequestID) error

// Stats produces insight on the current state of a graphsync exchange
Stats() Stats
}
22 changes: 22 additions & 0 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,28 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ
return gs.requestManager.CancelRequest(ctx, requestID)
}

// Stats produces insight on the current state of a graphsync exchange
func (gs *GraphSync) Stats() graphsync.Stats {
outgoingRequestStats := gs.requestQueue.Stats()
incomingResponseStats := gs.requestAllocator.Stats()

ptqstats := gs.peerTaskQueue.Stats()
incomingRequestStats := graphsync.RequestStats{
TotalPeers: uint64(ptqstats.NumPeers),
Active: uint64(ptqstats.NumActive),
Pending: uint64(ptqstats.NumPending),
}
outgoingResponseStats := gs.responseAllocator.Stats()

return graphsync.Stats{
OutgoingRequests: outgoingRequestStats,
IncomingResponses: incomingResponseStats,

IncomingRequests: incomingRequestStats,
OutgoingResponses: outgoingResponseStats,
}
}

type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
Expand Down
12 changes: 12 additions & 0 deletions taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -19,6 +20,7 @@ type Executor interface {
type TaskQueue interface {
PushTask(p peer.ID, task peertask.Task)
TaskDone(p peer.ID, task *peertask.Task)
Stats() graphsync.RequestStats
}

// TaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers
Expand Down Expand Up @@ -57,6 +59,16 @@ func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {
tq.peerTaskQueue.TasksDone(p, task)
}

// Stats returns statistics about a task queue
func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats {
ptqstats := tq.peerTaskQueue.Stats()
return graphsync.RequestStats{
TotalPeers: uint64(ptqstats.NumPeers),
Active: uint64(ptqstats.NumActive),
Pending: uint64(ptqstats.NumPending),
}
}

// Startup runs the given number of task workers with the given executor
func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) {
for i := uint64(0); i < workerCount; i++ {
Expand Down