Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race on peer state gather #303

Merged
merged 3 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,17 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error {
}

func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState {
requestStates := make(graphsync.RequestStates)
for id, ipr := range rm.inProgressRequestStatuses {
if ipr.p == p {
requestStates[id] = graphsync.RequestState(ipr.state)
var peerState peerstate.PeerState
rm.requestQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
requestStates := make(graphsync.RequestStates)
for id, ipr := range rm.inProgressRequestStatuses {
if ipr.p == p {
requestStates[id] = graphsync.RequestState(ipr.state)
}
}
}
peerTopics := rm.requestQueue.PeerTopics(p)
return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
})
return peerState
}

func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState {
Expand Down
4 changes: 2 additions & 2 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,8 +1306,8 @@ func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) {
func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {}
func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {}
func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} }
func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics {
return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]}
func (ntq nullTaskQueue) WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics)) {
f(&peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]})
}

var _ taskqueue.TaskQueue = nullTaskQueue{}
17 changes: 10 additions & 7 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,17 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID
}

func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState {
requestStates := make(graphsync.RequestStates)
for key, ipr := range rm.inProgressResponses {
if key.p == p {
requestStates[key.requestID] = ipr.state
var peerState peerstate.PeerState
rm.responseQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) {
requestStates := make(graphsync.RequestStates)
for key, ipr := range rm.inProgressResponses {
if key.p == p {
requestStates[key.requestID] = ipr.state
}
}
}
peerTopics := rm.responseQueue.PeerTopics(p)
return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)}
})
return peerState
}

func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState {
Expand Down
22 changes: 21 additions & 1 deletion taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ type TaskQueue interface {
TaskDone(p peer.ID, task *peertask.Task)
Remove(t peertask.Topic, p peer.ID)
Stats() graphsync.RequestStats
PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics
WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics))
}

// WorkerTaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers
// that pop tasks and execute them
type WorkerTaskQueue struct {
lockTopics sync.Mutex
*peertaskqueue.PeerTaskQueue
ctx context.Context
cancelFn func()
Expand All @@ -55,7 +56,9 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {

// PushTask pushes a new task on to the queue
func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) {
tq.lockTopics.Lock()
tq.PeerTaskQueue.PushTasks(p, task)
tq.lockTopics.Unlock()
select {
case tq.workSignal <- struct{}{}:
default:
Expand All @@ -64,19 +67,30 @@ func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) {

// TaskDone marks a task as completed so further tasks can be executed
func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {
tq.lockTopics.Lock()
tq.PeerTaskQueue.TasksDone(p, task)
tq.lockTopics.Unlock()
}

// Stats returns statistics about a task queue
func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats {
tq.lockTopics.Lock()
ptqstats := tq.PeerTaskQueue.Stats()
tq.lockTopics.Unlock()
return graphsync.RequestStats{
TotalPeers: uint64(ptqstats.NumPeers),
Active: uint64(ptqstats.NumActive),
Pending: uint64(ptqstats.NumPending),
}
}

func (tq *WorkerTaskQueue) WithPeerTopics(p peer.ID, withPeerTopics func(*peertracker.PeerTrackerTopics)) {
tq.lockTopics.Lock()
peerTopics := tq.PeerTaskQueue.PeerTopics(p)
withPeerTopics(peerTopics)
tq.lockTopics.Unlock()
}

// Startup runs the given number of task workers with the given executor
func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) {
for i := uint64(0); i < workerCount; i++ {
Expand All @@ -100,16 +114,22 @@ func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
func (tq *WorkerTaskQueue) worker(executor Executor) {
targetWork := 1
for {
tq.lockTopics.Lock()
pid, tasks, _ := tq.PeerTaskQueue.PopTasks(targetWork)
tq.lockTopics.Unlock()
for len(tasks) == 0 {
select {
case <-tq.ctx.Done():
return
case <-tq.workSignal:
tq.lockTopics.Lock()
pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork)
tq.lockTopics.Unlock()
case <-tq.ticker.C:
tq.lockTopics.Lock()
tq.PeerTaskQueue.ThawRound()
pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork)
tq.lockTopics.Unlock()
}
}
for _, task := range tasks {
Expand Down