From 6587f43037ebd694815b30928e08d3d0fc0243eb Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 29 Nov 2021 17:53:50 -0800 Subject: [PATCH 1/2] fix(responsemanager): fix flaky tests --- responsemanager/client.go | 7 ++++++- responsemanager/messages.go | 5 +++++ responsemanager/responsemanager_test.go | 3 +++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/responsemanager/client.go b/responsemanager/client.go index dec66152..62f1dca9 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -224,7 +224,12 @@ func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, // FinishTask marks a task from the task queue as done func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) { - rm.send(&finishTaskRequest{task, err}, nil) + done := make(chan struct{}, 1) + rm.send(&finishTaskRequest{task, err, done}, nil) + select { + case <-rm.ctx.Done(): + case <-done: + } } // CloseWithNetworkError closes a request due to a network error diff --git a/responsemanager/messages.go b/responsemanager/messages.go index 65340724..0dad608d 100644 --- a/responsemanager/messages.go +++ b/responsemanager/messages.go @@ -85,10 +85,15 @@ func (rur *responseUpdateRequest) handle(rm *ResponseManager) { type finishTaskRequest struct { task *peertask.Task err error + done chan struct{} } func (ftr *finishTaskRequest) handle(rm *ResponseManager) { rm.finishTask(ftr.task, ftr.err) + select { + case <-rm.ctx.Done(): + case ftr.done <- struct{}{}: + } } type startTaskRequest struct { diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index b05582d4..0b22ef42 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -291,6 +291,7 @@ func TestValidationAndExtensions(t *testing.T) { // request fails with base loader reading from block store that's missing data responseManager.ProcessRequests(td.ctx, td.p, td.requests) td.assertCompleteRequestWith(graphsync.RequestFailedContentNotFound) + td.taskqueue.WaitForNoActiveTasks() err := td.peristenceOptions.Register("chainstore", td.persistence) require.NoError(t, err) @@ -632,6 +633,8 @@ func TestValidationAndExtensions(t *testing.T) { td.verifyNResponses(blockCount) td.assertPausedRequest() + td.taskqueue.WaitForNoActiveTasks() + // send update responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) From 4bcafb9e1652300b70169830dea19d1889d0a2e1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 29 Nov 2021 18:43:43 -0800 Subject: [PATCH 2/2] fix(responsemanager): make fix more global --- responsemanager/responsemanager_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 0b22ef42..fd2e6b21 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -291,7 +291,6 @@ func TestValidationAndExtensions(t *testing.T) { // request fails with base loader reading from block store that's missing data responseManager.ProcessRequests(td.ctx, td.p, td.requests) td.assertCompleteRequestWith(graphsync.RequestFailedContentNotFound) - td.taskqueue.WaitForNoActiveTasks() err := td.peristenceOptions.Register("chainstore", td.persistence) require.NoError(t, err) @@ -633,8 +632,6 @@ func TestValidationAndExtensions(t *testing.T) { td.verifyNResponses(blockCount) td.assertPausedRequest() - td.taskqueue.WaitForNoActiveTasks() - // send update responseManager.ProcessRequests(td.ctx, td.p, td.updateRequests) @@ -1114,6 +1111,7 @@ func (td *testData) newQueryExecutor(manager queryexecutor.Manager) *queryexecut func (td *testData) assertPausedRequest() { var pausedRequest pausedRequest testutil.AssertReceive(td.ctx, td.t, td.pausedRequests, &pausedRequest, "should pause request") + td.taskqueue.WaitForNoActiveTasks() } func (td *testData) getAllBlocks() []blocks.Block { @@ -1150,6 +1148,7 @@ func (td *testData) assertCompleteRequestWith(expectedCode graphsync.ResponseSta var status graphsync.ResponseStatusCode testutil.AssertReceive(td.ctx, td.t, td.completedResponseStatuses, &status, "should receive status") require.Equal(td.t, expectedCode, status) + td.taskqueue.WaitForNoActiveTasks() } func (td *testData) assertOnlyCompleteProcessingWith(expectedCode graphsync.ResponseStatusCode) {