diff --git a/requestmanager/server.go b/requestmanager/server.go index da7fd0f0..568a52c8 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -395,14 +395,17 @@ func (rm *RequestManager) pause(id graphsync.RequestID) error { } func (rm *RequestManager) peerStats(p peer.ID) peerstate.PeerState { - requestStates := make(graphsync.RequestStates) - for id, ipr := range rm.inProgressRequestStatuses { - if ipr.p == p { - requestStates[id] = graphsync.RequestState(ipr.state) + var peerState peerstate.PeerState + rm.requestQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) { + requestStates := make(graphsync.RequestStates) + for id, ipr := range rm.inProgressRequestStatuses { + if ipr.p == p { + requestStates[id] = graphsync.RequestState(ipr.state) + } } - } - peerTopics := rm.requestQueue.PeerTopics(p) - return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} + peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} + }) + return peerState } func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 993583c9..47f843f8 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -1306,8 +1306,8 @@ func (ntq nullTaskQueue) PushTask(p peer.ID, task peertask.Task) { func (ntq nullTaskQueue) TaskDone(p peer.ID, task *peertask.Task) {} func (ntq nullTaskQueue) Remove(t peertask.Topic, p peer.ID) {} func (ntq nullTaskQueue) Stats() graphsync.RequestStats { return graphsync.RequestStats{} } -func (ntq nullTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics { - return &peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]} +func (ntq nullTaskQueue) WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics)) { + f(&peertracker.PeerTrackerTopics{Pending: ntq.tasksQueued[p]}) } var _ taskqueue.TaskQueue = nullTaskQueue{} diff --git a/responsemanager/server.go b/responsemanager/server.go index c0f109a9..79a3d8fd 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -293,14 +293,17 @@ func (rm *ResponseManager) pauseRequest(p peer.ID, requestID graphsync.RequestID } func (rm *ResponseManager) peerState(p peer.ID) peerstate.PeerState { - requestStates := make(graphsync.RequestStates) - for key, ipr := range rm.inProgressResponses { - if key.p == p { - requestStates[key.requestID] = ipr.state + var peerState peerstate.PeerState + rm.responseQueue.WithPeerTopics(p, func(peerTopics *peertracker.PeerTrackerTopics) { + requestStates := make(graphsync.RequestStates) + for key, ipr := range rm.inProgressResponses { + if key.p == p { + requestStates[key.requestID] = ipr.state + } } - } - peerTopics := rm.responseQueue.PeerTopics(p) - return peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} + peerState = peerstate.PeerState{RequestStates: requestStates, TaskQueueState: fromPeerTopics(peerTopics)} + }) + return peerState } func fromPeerTopics(pt *peertracker.PeerTrackerTopics) peerstate.TaskQueueState { diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index 7a667a29..e78113ac 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -25,12 +25,13 @@ type TaskQueue interface { TaskDone(p peer.ID, task *peertask.Task) Remove(t peertask.Topic, p peer.ID) Stats() graphsync.RequestStats - PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics + WithPeerTopics(p peer.ID, f func(*peertracker.PeerTrackerTopics)) } // WorkerTaskQueue is a wrapper around peertaskqueue.PeerTaskQueue that manages running workers // that pop tasks and execute them type WorkerTaskQueue struct { + lockTopics sync.Mutex *peertaskqueue.PeerTaskQueue ctx context.Context cancelFn func() @@ -55,7 +56,9 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { // PushTask pushes a new task on to the queue func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { + tq.lockTopics.Lock() tq.PeerTaskQueue.PushTasks(p, task) + tq.lockTopics.Unlock() select { case tq.workSignal <- struct{}{}: default: @@ -64,12 +67,16 @@ func (tq *WorkerTaskQueue) PushTask(p peer.ID, task peertask.Task) { // TaskDone marks a task as completed so further tasks can be executed func (tq *WorkerTaskQueue) TaskDone(p peer.ID, task *peertask.Task) { + tq.lockTopics.Lock() tq.PeerTaskQueue.TasksDone(p, task) + tq.lockTopics.Unlock() } // Stats returns statistics about a task queue func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { + tq.lockTopics.Lock() ptqstats := tq.PeerTaskQueue.Stats() + tq.lockTopics.Unlock() return graphsync.RequestStats{ TotalPeers: uint64(ptqstats.NumPeers), Active: uint64(ptqstats.NumActive), @@ -77,6 +84,13 @@ func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { } } +func (tq *WorkerTaskQueue) WithPeerTopics(p peer.ID, withPeerTopics func(*peertracker.PeerTrackerTopics)) { + tq.lockTopics.Lock() + peerTopics := tq.PeerTaskQueue.PeerTopics(p) + withPeerTopics(peerTopics) + tq.lockTopics.Unlock() +} + // Startup runs the given number of task workers with the given executor func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) { for i := uint64(0); i < workerCount; i++ { @@ -100,16 +114,22 @@ func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { func (tq *WorkerTaskQueue) worker(executor Executor) { targetWork := 1 for { + tq.lockTopics.Lock() pid, tasks, _ := tq.PeerTaskQueue.PopTasks(targetWork) + tq.lockTopics.Unlock() for len(tasks) == 0 { select { case <-tq.ctx.Done(): return case <-tq.workSignal: + tq.lockTopics.Lock() pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) + tq.lockTopics.Unlock() case <-tq.ticker.C: + tq.lockTopics.Lock() tq.PeerTaskQueue.ThawRound() pid, tasks, _ = tq.PeerTaskQueue.PopTasks(targetWork) + tq.lockTopics.Unlock() } } for _, task := range tasks {