Skip to content

Commit

Permalink
feat(asyncloader): fix flakiness in test
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Dec 18, 2021
1 parent da91f74 commit 8d87afc
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
22 changes: 16 additions & 6 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)..., // half of the full chain
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -879,7 +879,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
Expand Down Expand Up @@ -970,7 +970,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
Expand Down Expand Up @@ -1350,7 +1350,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...,
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -1586,7 +1586,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
},
testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount)...),
responseMessageTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 100)...,
), tracing.TracesToStrings())
}
Expand Down Expand Up @@ -1724,3 +1724,13 @@ func (r *receiver) Connected(p peer.ID) {

func (r *receiver) Disconnected(p peer.ID) {
}

func responseMessageTraces(t *testing.T, tracing *testutil.Collector, responseCount int) []string {
traces := testutil.RepeatTraceStrings("responseMessage({})->loaderProcess(0)->cacheProcess(0)", responseCount-1)
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsInt64Slice()) == 0 {
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
}
10 changes: 9 additions & 1 deletion requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand Down Expand Up @@ -109,7 +111,13 @@ func (al *AsyncLoader) ProcessResponse(
responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {

ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess")
requestIds := make([]int, 0, len(responses))
for requestID := range responses {
requestIds = append(requestIds, int(requestID))
}
ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess", trace.WithAttributes(
attribute.IntSlice("requestIDs", requestIds),
))
defer span.End()

al.stateLk.Lock()
Expand Down

0 comments on commit 8d87afc

Please sign in to comment.