Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID,

// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
rm.send(&finishTaskRequest{task, err}, nil)
done := make(chan struct{}, 1)
rm.send(&finishTaskRequest{task, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// CloseWithNetworkError closes a request due to a network error
Expand Down
5 changes: 5 additions & 0 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ func (rur *responseUpdateRequest) handle(rm *ResponseManager) {
type finishTaskRequest struct {
task *peertask.Task
err error
done chan struct{}
}

func (ftr *finishTaskRequest) handle(rm *ResponseManager) {
rm.finishTask(ftr.task, ftr.err)
select {
case <-rm.ctx.Done():
case ftr.done <- struct{}{}:
}
}

type startTaskRequest struct {
Expand Down
2 changes: 2 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,7 @@ func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecut
func (td *testData) assertPausedRequest() {
var pausedRequest pausedRequest
testutil.AssertReceive(td.ctx, td.t, td.pausedRequests, &pausedRequest, "should pause request")
td.taskqueue.WaitForNoActiveTasks()
}

func (td *testData) getAllBlocks() []blocks.Block {
Expand Down Expand Up @@ -1147,6 +1148,7 @@ func (td *testData) assertCompleteRequestWith(expectedCode graphsync.ResponseSta
var status graphsync.ResponseStatusCode
testutil.AssertReceive(td.ctx, td.t, td.completedResponseStatuses, &status, "should receive status")
require.Equal(td.t, expectedCode, status)
td.taskqueue.WaitForNoActiveTasks()
}

func (td *testData) assertOnlyCompleteProcessingWith(expectedCode graphsync.ResponseStatusCode) {
Expand Down