diff --git a/responsemanager/client.go b/responsemanager/client.go index dec66152..62f1dca9 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -224,7 +224,12 @@ func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, // FinishTask marks a task from the task queue as done func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) { - rm.send(&finishTaskRequest{task, err}, nil) + done := make(chan struct{}, 1) + rm.send(&finishTaskRequest{task, err, done}, nil) + select { + case <-rm.ctx.Done(): + case <-done: + } } // CloseWithNetworkError closes a request due to a network error diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 65340724..0dad608d 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -85,10 +85,15 @@ func (rur *responseUpdateRequest) handle(rm *ResponseManager) { type finishTaskRequest struct { task *peertask.Task err error + done chan struct{} } func (ftr *finishTaskRequest) handle(rm *ResponseManager) { rm.finishTask(ftr.task, ftr.err) + select { + case <-rm.ctx.Done(): + case ftr.done <- struct{}{}: + } } type startTaskRequest struct { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index b05582d4..fd2e6b21 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -1111,6 +1111,7 @@ func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecut func (td *testData) assertPausedRequest() { var pausedRequest pausedRequest testutil.AssertReceive(td.ctx, td.t, td.pausedRequests, &pausedRequest, "should pause request") + td.taskqueue.WaitForNoActiveTasks() } func (td *testData) getAllBlocks() []blocks.Block { @@ -1147,6 +1148,7 @@ func (td *testData) assertCompleteRequestWith(expectedCode graphsync.ResponseSta var status graphsync.ResponseStatusCode testutil.AssertReceive(td.ctx, td.t, td.completedResponseStatuses, &status, "should receive status") require.Equal(td.t, expectedCode, status) + td.taskqueue.WaitForNoActiveTasks() } func (td *testData) assertOnlyCompleteProcessingWith(expectedCode graphsync.ResponseStatusCode) {