Skip to content

Commit

Permalink
feat(responsemanager): trace full messages via links to responses (#325)
Browse files Browse the repository at this point in the history
* feat(responsemanager): trace full messages via links to responses

Fixes: #318

* chore(responsemanager): rename processRequests internals for consistency

* fix(responsemanager): make TestCancellationQueryInProgress less strict
  • Loading branch information
rvagg committed Jan 14, 2022
1 parent 789b34a commit fe14173
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 31 deletions.
28 changes: 21 additions & 7 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ func TestRejectRequestsByDefault(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
"response(0)->transaction(0)->execute(0)->buildMessage(0)",
"processRequests(0)->transaction(0)->execute(0)->buildMessage(0)",
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
"response(0)",
}, tracing.TracesToStrings())
// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -561,7 +562,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -635,7 +636,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -887,6 +888,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processRequests(1)",
},
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
Expand All @@ -895,13 +897,23 @@ func TestPauseResumeViaUpdate(t *testing.T) {
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())
// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)

message0Span := tracing.FindSpanByTraceString("processRequests(0)")
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
// response(0) originates in processRequests(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->processUpdate(0) occurs thanks to processRequests(1)
require.Len(t, processUpdateSpan.Links, 1)
require.Equal(t, processUpdateSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
Expand Down Expand Up @@ -980,6 +992,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processRequests(1)",
},
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
Expand All @@ -988,7 +1001,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
Expand Down Expand Up @@ -1074,6 +1087,7 @@ func TestNetworkDisconnect(t *testing.T) {
tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "processRequests(0)->transaction(0)->execute(0)->buildMessage(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
Expand Down Expand Up @@ -1370,7 +1384,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -1610,7 +1624,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func New(ctx context.Context,

// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
rm.send(&processRequestMessage{p, requests}, ctx.Done())
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
}

// UnpauseResponse unpauses a response that was previously paused
Expand Down
6 changes: 3 additions & 3 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
)

type processRequestMessage struct {
type processRequestsMessage struct {
p peer.ID
requests []gsmsg.GraphSyncRequest
}

func (prm *processRequestMessage) handle(rm *ResponseManager) {
func (prm *processRequestsMessage) handle(rm *ResponseManager) {
rm.processRequests(prm.p, prm.requests)
}

Expand All @@ -41,7 +41,7 @@ type errorRequestMessage struct {
}

func (erm *errorRequestMessage) handle(rm *ResponseManager) {
err := rm.abortRequest(erm.p, erm.requestID, erm.err)
err := rm.abortRequest(rm.ctx, erm.p, erm.requestID, erm.err)
select {
case <-rm.ctx.Done():
case erm.response <- err:
Expand Down
25 changes: 23 additions & 2 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,15 @@ func TestIncomingQuery(t *testing.T) {
td.connManager.RefuteProtected(t, td.p)

tracing := td.collectTracing(t)
require.ElementsMatch(t, append(
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength),
require.ElementsMatch(t, append(append(
[]string{"processRequests(0)"},
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength)...),
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", td.blockChainLength)..., // half of the full chain
), tracing.TracesToStrings())
messageSpan := tracing.FindSpanByTraceString("processRequests(0)")
responseSpan := tracing.FindSpanByTraceString("TestIncomingQuery(0)->response(0)")
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), messageSpan.SpanContext.SpanID())
}

func TestCancellationQueryInProgress(t *testing.T) {
Expand Down Expand Up @@ -129,6 +134,22 @@ func TestCancellationQueryInProgress(t *testing.T) {
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()

tracing := td.collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "processRequests(0)")
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
require.Contains(t, traceStrings, "processRequests(1)")
message0Span := tracing.FindSpanByTraceString("processRequests(0)")
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
abortRequestSpan := tracing.FindSpanByTraceString("response(0)->abortRequest(0)")
// response(0) originates in processRequests(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->abortRequest(0) occurs thanks to processRequests(1)
require.Len(t, abortRequestSpan.Links, 1)
require.Equal(t, abortRequestSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestCancellationViaCommand(t *testing.T) {
Expand Down
57 changes: 39 additions & 18 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
ipr.span.End()
}

func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
return
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))
_, span := otel.Tracer("graphsync").Start(
trace.ContextWithSpan(ctx, response.span),
"processUpdate",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))

defer span.End()

if response.state != graphsync.Paused {
Expand Down Expand Up @@ -125,15 +130,18 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
return nil
}

func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
func (rm *ResponseManager) abortRequest(ctx context.Context, p peer.ID, requestID graphsync.RequestID, err error) error {
key := responseKey{p, requestID}
rm.responseQueue.Remove(key, key.p)
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
return errors.New("could not find request")
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
"abortRequest",
trace.WithLinks(trace.LinkFromContext(ctx)),
)
defer span.End()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand Down Expand Up @@ -166,25 +174,38 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
}

func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
ctx, messageSpan := otel.Tracer("graphsync").Start(
rm.ctx,
"processRequests",
trace.WithAttributes(attribute.String("peerID", p.Pretty())),
)
defer messageSpan.End()

for _, request := range requests {
key := responseKey{p: p, requestID: request.ID()}
if request.IsCancel() {
_ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{})
_ = rm.abortRequest(ctx, p, request.ID(), ipldutil.ContextCancelError{})
continue
}
if request.IsUpdate() {
rm.processUpdate(key, request)
rm.processUpdate(ctx, key, request)
continue
}
rm.connManager.Protect(p, request.ID().Tag())
ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
ctx, cancelFn := context.WithCancel(ctx)
// don't use `ctx` which has the "message" trace, but rm.ctx for a fresh trace which allows
// for a request hook to join this particular response up to an existing external trace
rctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
rctx, responseSpan := otel.Tracer("graphsync").Start(
rctx,
"response",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
rctx, cancelFn := context.WithCancel(rctx)
sub := &subscriber{
p: key.p,
request: request,
Expand All @@ -202,7 +223,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync

rm.inProgressResponses[key] =
&inProgressResponseStatus{
ctx: ctx,
ctx: rctx,
span: responseSpan,
cancelFn: cancelFn,
request: request,
Expand Down

0 comments on commit fe14173

Please sign in to comment.