Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(responsemanager): trace full messages via links to responses #325

Merged
merged 3 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,10 @@ func TestRejectRequestsByDefault(t *testing.T) {
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processResponses(0)->loaderProcess(0)->cacheProcess(0)",
"response(0)->transaction(0)->execute(0)->buildMessage(0)",
"processRequests(0)->transaction(0)->execute(0)->buildMessage(0)",
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
"response(0)",
}, tracing.TracesToStrings())
// has ContextCancelError exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
Expand Down Expand Up @@ -561,7 +562,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -635,7 +636,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -887,6 +888,7 @@ func TestPauseResumeViaUpdate(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processRequests(1)",
},
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
Expand All @@ -895,13 +897,23 @@ func TestPauseResumeViaUpdate(t *testing.T) {
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())
// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)

message0Span := tracing.FindSpanByTraceString("processRequests(0)")
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
// response(0) originates in processRequests(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->processUpdate(0) occurs thanks to processRequests(1)
require.Len(t, processUpdateSpan.Links, 1)
require.Equal(t, processUpdateSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
Expand Down Expand Up @@ -980,6 +992,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
"request(0)->terminateRequest(0)",
"processRequests(1)",
},
processResponsesTraces(t, tracing, responseCount)...),
testutil.RepeatTraceStrings("message({})->sendMessage(0)", responseCount+2)...),
Expand All @@ -988,7 +1001,7 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->loadBlock(0)", 50)...),
testutil.RepeatTraceStrings("response(0)->executeTask(1)->processBlock({})->sendBlock(0)->processBlockHooks(0)", 50)...), // half of the full chain
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+3)...,
), tracing.TracesToStrings())
// make sure the attributes are what we expect
processUpdateSpan := tracing.FindSpanByTraceString("response(0)->processUpdate(0)")
Expand Down Expand Up @@ -1074,6 +1087,7 @@ func TestNetworkDisconnect(t *testing.T) {
tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "processRequests(0)->transaction(0)->execute(0)->buildMessage(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
Expand Down Expand Up @@ -1370,7 +1384,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down Expand Up @@ -1610,7 +1624,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
testutil.RepeatTraceStrings("request(0)->verifyBlock({})", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->loadBlock(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", blockChainLength)...),
testutil.RepeatTraceStrings("response(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
testutil.RepeatTraceStrings("processRequests(0)->transaction({})->execute(0)->buildMessage(0)", blockChainLength+2)...,
), tracing.TracesToStrings())
}

Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func New(ctx context.Context,

// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
rm.send(&processRequestMessage{p, requests}, ctx.Done())
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
}

// UnpauseResponse unpauses a response that was previously paused
Expand Down
6 changes: 3 additions & 3 deletions responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"github.com/ipfs/go-graphsync/responsemanager/queryexecutor"
)

type processRequestMessage struct {
type processRequestsMessage struct {
p peer.ID
requests []gsmsg.GraphSyncRequest
}

func (prm *processRequestMessage) handle(rm *ResponseManager) {
func (prm *processRequestsMessage) handle(rm *ResponseManager) {
rm.processRequests(prm.p, prm.requests)
}

Expand All @@ -41,7 +41,7 @@ type errorRequestMessage struct {
}

func (erm *errorRequestMessage) handle(rm *ResponseManager) {
err := rm.abortRequest(erm.p, erm.requestID, erm.err)
err := rm.abortRequest(rm.ctx, erm.p, erm.requestID, erm.err)
select {
case <-rm.ctx.Done():
case erm.response <- err:
Expand Down
25 changes: 23 additions & 2 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,15 @@ func TestIncomingQuery(t *testing.T) {
td.connManager.RefuteProtected(t, td.p)

tracing := td.collectTracing(t)
require.ElementsMatch(t, append(
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength),
require.ElementsMatch(t, append(append(
[]string{"processRequests(0)"},
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->loadBlock(0)", td.blockChainLength)...),
testutil.RepeatTraceStrings("TestIncomingQuery(0)->response(0)->executeTask(0)->processBlock({})->sendBlock(0)->processBlockHooks(0)", td.blockChainLength)..., // half of the full chain
), tracing.TracesToStrings())
messageSpan := tracing.FindSpanByTraceString("processRequests(0)")
responseSpan := tracing.FindSpanByTraceString("TestIncomingQuery(0)->response(0)")
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), messageSpan.SpanContext.SpanID())
}

func TestCancellationQueryInProgress(t *testing.T) {
Expand Down Expand Up @@ -129,6 +134,22 @@ func TestCancellationQueryInProgress(t *testing.T) {
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()

tracing := td.collectTracing(t)
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "processRequests(0)")
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
require.Contains(t, traceStrings, "processRequests(1)")
message0Span := tracing.FindSpanByTraceString("processRequests(0)")
message1Span := tracing.FindSpanByTraceString("processRequests(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
abortRequestSpan := tracing.FindSpanByTraceString("response(0)->abortRequest(0)")
// response(0) originates in processRequests(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->abortRequest(0) occurs thanks to processRequests(1)
require.Len(t, abortRequestSpan.Links, 1)
require.Equal(t, abortRequestSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestCancellationViaCommand(t *testing.T) {
Expand Down
57 changes: 39 additions & 18 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
ipr.span.End()
}

func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
return
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))
_, span := otel.Tracer("graphsync").Start(
trace.ContextWithSpan(ctx, response.span),
"processUpdate",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))

defer span.End()

if response.state != graphsync.Paused {
Expand Down Expand Up @@ -125,15 +130,18 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
return nil
}

func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
func (rm *ResponseManager) abortRequest(ctx context.Context, p peer.ID, requestID graphsync.RequestID, err error) error {
key := responseKey{p, requestID}
rm.responseQueue.Remove(key, key.p)
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
return errors.New("could not find request")
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
"abortRequest",
trace.WithLinks(trace.LinkFromContext(ctx)),
)
defer span.End()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand Down Expand Up @@ -166,25 +174,38 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
}

func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
ctx, messageSpan := otel.Tracer("graphsync").Start(
rm.ctx,
"processRequests",
trace.WithAttributes(attribute.String("peerID", p.Pretty())),
)
defer messageSpan.End()

for _, request := range requests {
key := responseKey{p: p, requestID: request.ID()}
if request.IsCancel() {
_ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{})
_ = rm.abortRequest(ctx, p, request.ID(), ipldutil.ContextCancelError{})
continue
}
if request.IsUpdate() {
rm.processUpdate(key, request)
rm.processUpdate(ctx, key, request)
continue
}
rm.connManager.Protect(p, request.ID().Tag())
ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
ctx, cancelFn := context.WithCancel(ctx)
// don't use `ctx` which has the "message" trace, but rm.ctx for a fresh trace which allows
// for a request hook to join this particular response up to an existing external trace
rctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
rctx, responseSpan := otel.Tracer("graphsync").Start(
rctx,
"response",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
rctx, cancelFn := context.WithCancel(rctx)
sub := &subscriber{
p: key.p,
request: request,
Expand All @@ -202,7 +223,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync

rm.inProgressResponses[key] =
&inProgressResponseStatus{
ctx: ctx,
ctx: rctx,
span: responseSpan,
cancelFn: cancelFn,
request: request,
Expand Down