Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(requestmanager): add tracing for response messages & block processing #322

Merged
merged 6 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 196 additions & 85 deletions impl/graphsync_test.go

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand Down Expand Up @@ -103,8 +106,20 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(
ctx context.Context,
responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {

requestIds := make([]int, 0, len(responses))
for requestID := range responses {
requestIds = append(requestIds, int(requestID))
}
ctx, span := otel.Tracer("graphsync").Start(ctx, "loaderProcess", trace.WithAttributes(
attribute.IntSlice("requestIDs", requestIds),
))
defer span.End()

al.stateLk.Lock()
defer al.stateLk.Unlock()
byQueue := make(map[string][]graphsync.RequestID)
Expand All @@ -119,7 +134,7 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat
for _, requestID := range requestIDs {
queueResponses[requestID] = responses[requestID]
}
responseCache.ProcessResponse(queueResponses, blks)
responseCache.ProcessResponse(ctx, queueResponses, blks)
loadAttemptQueue.RetryLoads()
}
}
Expand Down Expand Up @@ -178,7 +193,6 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac
}

func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) types.AsyncLoadResult {
Expand Down
14 changes: 7 additions & 7 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand All @@ -72,7 +72,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(responses, nil)
asyncLoader.ProcessResponse(context.Background(), responses, nil)

resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
st.AssertBlockStored(t, block)
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(responses, nil)
asyncLoader.ProcessResponse(context.Background(), responses, nil)
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.ProcessResponse(context.Background(), responses, blocks)

assertSuccessResponse(ctx, t, resultChan1)
assertSuccessResponse(ctx, t, resultChan2)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.ProcessResponse(context.Background(), responses, blocks)
asyncLoader.CompleteResponsesFor(requestID1)

assertFailResponse(ctx, t, resultChan1)
Expand Down
19 changes: 16 additions & 3 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package responsecache

import (
"context"
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/linktracker"
Expand All @@ -21,7 +25,7 @@ type UnverifiedBlockStore interface {
PruneBlocks(func(ipld.Link, uint64) bool)
PruneBlock(ipld.Link)
VerifyBlock(ipld.Link, ipld.LinkContext) ([]byte, error)
AddUnverifiedBlock(ipld.Link, []byte)
AddUnverifiedBlock(trace.Link, ipld.Link, []byte)
}

// ResponseCache maintains a store of unverified blocks and response
Expand Down Expand Up @@ -67,13 +71,22 @@ func (rc *ResponseCache) AttemptLoad(requestID graphsync.RequestID, link ipld.Li

// ProcessResponse processes incoming response data, adding unverified blocks,
// and tracking link metadata from a remote peer
func (rc *ResponseCache) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (rc *ResponseCache) ProcessResponse(
ctx context.Context,
responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {

ctx, span := otel.Tracer("graphsync").Start(ctx, "cacheProcess", trace.WithAttributes(
attribute.Int("blockCount", len(blks)),
))
traceLink := trace.LinkFromContext(ctx)
defer span.End()

rc.responseCacheLk.Lock()

for _, block := range blks {
log.Debugf("Received block from network: %s", block.Cid().String())
rc.unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData())
rc.unverifiedBlockStore.AddUnverifiedBlock(traceLink, cidlink.Link{Cid: block.Cid()}, block.RawData())
}

for requestID, md := range responses {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package responsecache

import (
"context"
"fmt"
"math/rand"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand All @@ -19,7 +21,7 @@ type fakeUnverifiedBlockStore struct {
inMemoryBlocks map[ipld.Link][]byte
}

func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) {
func (ubs *fakeUnverifiedBlockStore) AddUnverifiedBlock(_ trace.Link, lnk ipld.Link, data []byte) {
ubs.inMemoryBlocks[lnk] = data
}

Expand Down Expand Up @@ -100,7 +102,7 @@ func TestResponseCacheManagingLinks(t *testing.T) {
}
responseCache := New(fubs)

responseCache.ProcessResponse(responses, blks)
responseCache.ProcessResponse(context.Background(), responses, blks)

require.Len(t, fubs.blocks(), len(blks)-1, "should prune block with no references")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[2])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package unverifiedblockstore

import (
"context"
"fmt"

logging "github.com/ipfs/go-log/v2"
ipld "github.com/ipld/go-ipld-prime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var log = logging.Logger("gs-unverifiedbs")
Expand All @@ -16,24 +20,29 @@ type settableWriter interface {
// UnverifiedBlockStore holds an in memory cache of receied blocks from the network
// that have not been verified to be part of a traversal
type UnverifiedBlockStore struct {
inMemoryBlocks map[ipld.Link][]byte
inMemoryBlocks map[ipld.Link]tracedBlock
storer ipld.BlockWriteOpener
dataSize uint64
}

type tracedBlock struct {
block []byte
traceLink trace.Link
}

// New initializes a new unverified store with the given storer function for writing
// to permaneant storage if the block is verified
func New(storer ipld.BlockWriteOpener) *UnverifiedBlockStore {
return &UnverifiedBlockStore{
inMemoryBlocks: make(map[ipld.Link][]byte),
inMemoryBlocks: make(map[ipld.Link]tracedBlock),
storer: storer,
}
}

// AddUnverifiedBlock adds a new unverified block to the in memory cache as it
// comes in as part of a traversal.
func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) {
ubs.inMemoryBlocks[lnk] = data
func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(traceLink trace.Link, lnk ipld.Link, data []byte) {
ubs.inMemoryBlocks[lnk] = tracedBlock{data, traceLink}
ubs.dataSize = ubs.dataSize + uint64(len(data))
log.Debugw("added in-memory block", "total_queued_bytes", ubs.dataSize)
}
Expand All @@ -42,9 +51,9 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte)
// if the passed in function returns true for the given link
func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64) bool) {
for link, data := range ubs.inMemoryBlocks {
if shouldPrune(link, uint64(len(data))) {
if shouldPrune(link, uint64(len(data.block))) {
delete(ubs.inMemoryBlocks, link)
ubs.dataSize = ubs.dataSize - uint64(len(data))
ubs.dataSize = ubs.dataSize - uint64(len(data.block))
}
}
log.Debugw("finished pruning in-memory blocks", "total_queued_bytes", ubs.dataSize)
Expand All @@ -53,7 +62,7 @@ func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link, uint64)
// PruneBlock deletes an individual block from the store
func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) {
delete(ubs.inMemoryBlocks, link)
ubs.dataSize = ubs.dataSize - uint64(len(ubs.inMemoryBlocks[link]))
ubs.dataSize = ubs.dataSize - uint64(len(ubs.inMemoryBlocks[link].block))
log.Debugw("pruned in-memory block", "total_queued_bytes", ubs.dataSize)
}

Expand All @@ -64,18 +73,30 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.Lin
if !ok {
return nil, fmt.Errorf("block not found")
}

ctx := linkContext.Ctx
if ctx == nil {
ctx = context.Background()
}
_, span := otel.Tracer("graphsync").Start(
ctx,
"verifyBlock",
trace.WithLinks(data.traceLink),
trace.WithAttributes(attribute.String("cid", lnk.String())))
defer span.End()

delete(ubs.inMemoryBlocks, lnk)
ubs.dataSize = ubs.dataSize - uint64(len(data))
ubs.dataSize = ubs.dataSize - uint64(len(data.block))
log.Debugw("verified block", "total_queued_bytes", ubs.dataSize)

buffer, committer, err := ubs.storer(linkContext)
if err != nil {
return nil, err
}
if settable, ok := buffer.(settableWriter); ok {
err = settable.SetBytes(data)
err = settable.SetBytes(data.block)
} else {
_, err = buffer.Write(data)
_, err = buffer.Write(data.block)
}
if err != nil {
return nil, err
Expand All @@ -84,5 +105,5 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link, linkContext ipld.Lin
if err != nil {
return nil, err
}
return data, nil
return data.block, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync/testutil"
)
Expand All @@ -25,7 +26,7 @@ func TestVerifyBlockPresent(t *testing.T) {
require.Nil(t, data)
require.Error(t, err, "block should not be verifiable till it's added as an unverifiable block")

unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData())
unverifiedBlockStore.AddUnverifiedBlock(trace.Link{}, cidlink.Link{Cid: block.Cid()}, block.RawData())
reader, err = lsys.StorageReadOpener(ipld.LinkContext{}, cidlink.Link{Cid: block.Cid()})
require.Nil(t, reader)
require.Error(t, err, "block should not be loadable till it's verified")
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type PeerHandler interface {
// results as new responses are processed
type AsyncLoader interface {
StartRequest(graphsync.RequestID, string) error
ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
ProcessResponse(ctx context.Context, responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
Expand Down
13 changes: 11 additions & 2 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
// the traverser has its own context because we want to fail on block boundaries, in the executor,
// and make sure all blocks included up to the termination message
// are processed and passed in the response channel
ctx, cancel := context.WithCancel(rm.ctx)
ctx, cancel := context.WithCancel(trace.ContextWithSpan(rm.ctx, ipr.span))
ipr.traverserCancel = cancel
ipr.traverser = ipldutil.TraversalBuilder{
Root: cidlink.Link{Cid: ipr.request.Root()},
Expand Down Expand Up @@ -264,11 +264,20 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr

func (rm *RequestManager) processResponseMessage(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) {
log.Debugf("beging rocessing message for peer %s", p)
requestIds := make([]int, 0, len(responses))
for _, r := range responses {
requestIds = append(requestIds, int(r.RequestID()))
}
ctx, span := otel.Tracer("graphsync").Start(rm.ctx, "responseMessage", trace.WithAttributes(
attribute.String("peerID", p.Pretty()),
attribute.IntSlice("requestIDs", requestIds),
))
defer span.End()
filteredResponses := rm.processExtensions(responses, p)
filteredResponses = rm.filterResponsesForPeer(filteredResponses, p)
rm.updateLastResponses(filteredResponses)
responseMetadata := metadataForResponses(filteredResponses)
rm.asyncLoader.ProcessResponse(responseMetadata, blks)
rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks)
rm.processTerminations(filteredResponses)
log.Debugf("end processing message for peer %s", p)
}
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/testloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str
}

// ProcessResponse just records values passed to verify expectations later
func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (fal *FakeAsyncLoader) ProcessResponse(_ context.Context, responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
fal.responses <- responses
fal.blks <- blks
Expand Down
Loading