Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response message tracing #327

Merged
merged 9 commits into from
Jan 7, 2022
168 changes: 105 additions & 63 deletions impl/graphsync_test.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion messagequeue/builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package messagequeue

import (
"context"
"io"

"github.com/ipfs/go-graphsync"
Expand All @@ -11,6 +12,7 @@ import (
// Builder wraps a message builder with additional functions related to metadata
// and notifications in the message queue
type Builder struct {
ctx context.Context
*gsmsg.Builder
topic Topic
responseStreams map[graphsync.RequestID]io.Closer
Expand All @@ -19,8 +21,9 @@ type Builder struct {
}

// NewBuilder sets up a new builder for the given topic
func NewBuilder(topic Topic) *Builder {
func NewBuilder(ctx context.Context, topic Topic) *Builder {
return &Builder{
ctx: ctx,
Builder: gsmsg.NewBuilder(),
topic: topic,
responseStreams: make(map[graphsync.RequestID]io.Closer),
Expand All @@ -29,6 +32,10 @@ func NewBuilder(topic Topic) *Builder {
}
}

func (b *Builder) Context() context.Context {
return b.ctx
}

// SetResponseStream sets the given response stream to close should the message fail to send
func (b *Builder) SetResponseStream(requestID graphsync.RequestID, stream io.Closer) {
b.responseStreams[requestID] = stream
Expand Down Expand Up @@ -82,6 +89,7 @@ func (b *Builder) build(publisher notifications.Publisher) (gsmsg.GraphSyncMessa
BlockData: b.blockData,
ResponseCodes: message.ResponseCodes(),
},
ctx: b.ctx,
topic: b.topic,
msgSize: b.BlockSize(),
responseStreams: b.responseStreams,
Expand Down
25 changes: 23 additions & 2 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/ipfs/go-graphsync"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -112,7 +116,10 @@ func (mq *MessageQueue) buildMessage(size uint64, buildMessageFn func(*Builder))
if shouldBeginNewResponse(mq.builders, size) {
topic := mq.nextBuilderTopic
mq.nextBuilderTopic++
mq.builders = append(mq.builders, NewBuilder(topic))
ctx, _ := otel.Tracer("graphsync").Start(mq.ctx, "message", trace.WithAttributes(
attribute.Int64("topic", int64(topic)),
))
mq.builders = append(mq.builders, NewBuilder(ctx, topic))
}
builder := mq.builders[len(mq.builders)-1]
buildMessageFn(builder)
Expand Down Expand Up @@ -156,7 +163,12 @@ func (mq *MessageQueue) runQueue() {
for {
_, metadata, err := mq.extractOutgoingMessage()
if err == nil {
mq.publishError(metadata, fmt.Errorf("message queue shutdown"))
span := trace.SpanFromContext(metadata.ctx)
err := fmt.Errorf("message queue shutdown")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
mq.publishError(metadata, err)
mq.eventPublisher.Close(metadata.topic)
} else {
break
Expand Down Expand Up @@ -211,12 +223,20 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, intern

func (mq *MessageQueue) sendMessage() {
message, metadata, err := mq.extractOutgoingMessage()

if err != nil {
if err != errEmptyMessage {
log.Errorf("Unable to assemble GraphSync message: %s", err.Error())
}
return
}
span := trace.SpanFromContext(metadata.ctx)
defer span.End()
_, sendSpan := otel.Tracer("graphsync").Start(metadata.ctx, "sendMessage", trace.WithAttributes(
attribute.Int64("topic", int64(metadata.topic)),
attribute.Int64("size", int64(metadata.msgSize)),
))
defer sendSpan.End()
mq.publishQueued(metadata)
defer mq.eventPublisher.Close(metadata.topic)

Expand Down Expand Up @@ -337,6 +357,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTime
}

type internalMetadata struct {
ctx context.Context
public Metadata
topic Topic
msgSize uint64
Expand Down
7 changes: 7 additions & 0 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func TestProcessingNotification(t *testing.T) {

func TestDedupingMessages(t *testing.T) {
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()

Expand Down Expand Up @@ -251,6 +252,12 @@ func TestDedupingMessages(t *testing.T) {
t.Fatal("incorrect request added to message")
}
}

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)->sendMessage(0)",
"message(1)->sendMessage(0)",
}, tracing.TracesToStrings())
}

func TestSendsVeryLargeBlocksResponses(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion peermanager/peermessagemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type fakePeer struct {
}

func (fp *fakePeer) AllocateAndBuildMessage(blkSize uint64, buildMessage func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
buildMessage(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ type fakePeerHandler struct {

func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
requestBuilder func(b *messagequeue.Builder)) {
builder := messagequeue.NewBuilder(messagequeue.Topic(0))
builder := messagequeue.NewBuilder(context.TODO(), messagequeue.Topic(0))
requestBuilder(builder)
message, err := builder.Build()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type NetworkErrorListeners interface {

// ResponseAssembler is an interface that returns sender interfaces for peer responses.
type ResponseAssembler interface {
NewStream(p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
NewStream(ctx context.Context, p peer.ID, requestID graphsync.RequestID, subscriber notifications.Subscriber) responseassembler.ResponseStream
}

type responseManagerMessage interface {
Expand Down
44 changes: 29 additions & 15 deletions responsemanager/queryexecutor/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -82,7 +83,7 @@ func New(ctx context.Context,
// and uses the ResponseAssembler to build and send a response, while also triggering any of
// the QueryExecutor's BlockHooks. Traversal continues until complete, or a signal or hook
// suggests we should stop or pause.
func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool {
func (qe *QueryExecutor) ExecuteTask(_ context.Context, pid peer.ID, task *peertask.Task) bool {
// StartTask lets us block until this task is at the top of the execution stack
responseTaskChan := make(chan ResponseTask)
var rt ResponseTask
Expand All @@ -97,11 +98,11 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
return false
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(qe.ctx, rt.Span), "executeTask")
defer span.End()

log.Debugw("beginning response execution", "id", rt.Request.ID(), "peer", pid.String(), "root_cid", rt.Request.Root().String())
err := qe.executeQuery(pid, rt)
err := qe.executeQuery(ctx, pid, rt)
if err != nil {
span.RecordError(err)
if _, isPaused := err.(hooks.ErrPaused); !isPaused {
Expand All @@ -114,10 +115,10 @@ func (qe *QueryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *pee
}

func (qe *QueryExecutor) executeQuery(
p peer.ID, rt ResponseTask) error {
ctx context.Context, p peer.ID, rt ResponseTask) error {

// Execute the traversal operation, continue until we have reason to stop (error, pause, complete)
err := qe.runTraversal(p, rt)
err := qe.runTraversal(ctx, p, rt)

_, isPaused := err.(hooks.ErrPaused)
if isPaused {
Expand Down Expand Up @@ -180,7 +181,7 @@ func (qe *QueryExecutor) checkForUpdates(
}
}

func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
func (qe *QueryExecutor) runTraversal(ctx context.Context, p peer.ID, taskData ResponseTask) error {
for {
traverser := taskData.Traverser
isComplete, err := traverser.IsComplete()
Expand All @@ -195,26 +196,35 @@ func (qe *QueryExecutor) runTraversal(p peer.ID, taskData ResponseTask) error {
}
return err
}
lnk, data, err := qe.nextBlock(taskData)
lnk, lnkCtx := taskData.Traverser.CurrentRequest()
ctx, span := otel.Tracer("graphsync").Start(ctx, "processBlock", trace.WithAttributes(
rvagg marked this conversation as resolved.
Show resolved Hide resolved
attribute.String("cid", lnk.String()),
))
data, err := qe.loadBlock(ctx, taskData, lnk, lnkCtx)
if err != nil {
span.End()
return err
}
err = qe.sendResponse(p, taskData, lnk, data)
err = qe.sendResponse(ctx, p, taskData, lnk, data)
if err != nil {
span.End()
return err
}
span.End()
}
}

func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, error) {
lnk, lnkCtx := taskData.Traverser.CurrentRequest()
func (qe *QueryExecutor) loadBlock(ctx context.Context, taskData ResponseTask, lnk ipld.Link, lnkCtx ipld.LinkContext) ([]byte, error) {
_, span := otel.Tracer("graphsync").Start(ctx, "loadBlock")
defer span.End()

log.Debugf("will load link=%s", lnk)
result, err := taskData.Loader(lnkCtx, lnk)

if err != nil {
log.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
taskData.Traverser.Error(traversal.SkipMe{})
return lnk, nil, nil
return nil, nil
}

blockBuffer, ok := result.(*bytes.Buffer)
Expand All @@ -224,22 +234,24 @@ func (qe *QueryExecutor) nextBlock(taskData ResponseTask) (ipld.Link, []byte, er
if err != nil {
log.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
taskData.Traverser.Error(err)
return lnk, nil, err
return nil, err
}
}
data := blockBuffer.Bytes()
err = taskData.Traverser.Advance(blockBuffer)
if err != nil {
log.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, taskData.Traverser.NBlocksTraversed(), err)
return lnk, data, err
return data, err
}
log.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, taskData.Traverser.NBlocksTraversed())
return lnk, data, nil
return data, nil
}

func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
func (qe *QueryExecutor) sendResponse(ctx context.Context, p peer.ID, taskData ResponseTask, link ipld.Link, data []byte) error {
// Execute a transaction for this block, including any other queued operations
return taskData.ResponseStream.Transaction(func(rb responseassembler.ResponseBuilder) error {
ctx, span := otel.Tracer("graphsync").Start(ctx, "sendBlock", trace.WithLinks(trace.LinkFromContext(rb.Context())))
defer span.End()
// Ensure that any updates that have occurred till now are integrated into the response
err := qe.checkForUpdates(p, taskData, rb)
// On any error other than a pause, we bail, if it's a pause then we continue processing _this_ block
Expand All @@ -248,7 +260,9 @@ func (qe *QueryExecutor) sendResponse(p peer.ID, taskData ResponseTask, link ipl
}
blockData := rb.SendResponse(link, data)
if blockData.BlockSize() > 0 {
_, span := otel.Tracer("graphsync").Start(ctx, "processBlockHooks")
result := qe.blockHooks.ProcessBlockHooks(p, taskData.Request, blockData)
span.End()
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
Expand Down
4 changes: 4 additions & 0 deletions responsemanager/queryexecutor/queryexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func (rb fauxResponseBuilder) PauseRequest() {
}
}

func (rb fauxResponseBuilder) Context() context.Context {
return context.TODO()
}

var _ responseassembler.ResponseBuilder = &fauxResponseBuilder{}

type blockData struct {
Expand Down
7 changes: 7 additions & 0 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package responseassembler

import (
"context"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
Expand All @@ -18,6 +20,7 @@ type responseOperation interface {
}

type responseBuilder struct {
ctx context.Context
requestID graphsync.RequestID
operations []responseOperation
linkTracker *peerLinkTracker
Expand Down Expand Up @@ -47,6 +50,10 @@ func (rb *responseBuilder) PauseRequest() {
rb.operations = append(rb.operations, statusOperation{rb.requestID, graphsync.RequestPaused})
}

func (rb *responseBuilder) Context() context.Context {
return rb.ctx
}

func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
Expand Down
Loading