diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 95875c24..9e64eded 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -66,6 +66,9 @@ func BenchmarkRoundtripSuccess(b *testing.B) { b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) { p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) }) + b.Run("test-p2p-stress-1-1GB-memory-pressure-missing-blocks", func(b *testing.B) { + p2pStrestTest(ctx, b, 1, allFilesMissingTopLevelBlock(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) + }) b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) { p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) }) @@ -173,10 +176,10 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, wg.Add(1) go func(j int) { defer wg.Done() - for _ = range responseChan { + for range responseChan { } for err := range errChan { - b.Fatalf("received error on request: %s", err.Error()) + b.Logf("Error during network traversal: %s", err.Error()) } }(j) } @@ -302,6 +305,23 @@ func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLeve } } +func allFilesMissingTopLevelBlock(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc { + return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid { + cids := make([]cid.Cid, 0, len(provs)) + for _, prov := range provs { + c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes) + ds := merkledag.NewDAGService(blockservice.New(prov.BlockStore, offline.Exchange(prov.BlockStore))) + lnks, err := ds.GetLinks(ctx, c) + require.NoError(b, err) + randLink := lnks[rand.Intn(len(lnks))] + err = ds.Remove(ctx, randLink.Cid) + require.NoError(b, err) + cids = append(cids, c) + } + return cids + } +} + type tempDirMaker struct { tdm string tempDirSeq int32 diff --git a/impl/graphsync.go b/impl/graphsync.go index ce16c734..5f912061 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -63,10 +63,12 @@ type GraphSync struct { } type graphsyncConfigOptions struct { - totalMaxMemory uint64 - maxMemoryPerPeer uint64 - maxInProgressRequests uint64 - registerDefaultValidator bool + totalMaxMemoryResponder uint64 + maxMemoryPerPeerResponder uint64 + totalMaxMemoryRequestor uint64 + maxMemoryPerPeerRequestor uint64 + maxInProgressRequests uint64 + registerDefaultValidator bool } // Option defines the functional option type that can be used to configure @@ -85,7 +87,7 @@ func RejectAllRequestsByDefault() Option { // may consume queueing up messages for a response in total func MaxMemoryResponder(totalMaxMemory uint64) Option { return func(gs *graphsyncConfigOptions) { - gs.totalMaxMemory = totalMaxMemory + gs.totalMaxMemoryResponder = totalMaxMemory } } @@ -93,7 +95,23 @@ func MaxMemoryResponder(totalMaxMemory uint64) Option { // may consume queueing up messages for a response func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option { return func(gs *graphsyncConfigOptions) { - gs.maxMemoryPerPeer = maxMemoryPerPeer + gs.maxMemoryPerPeerResponder = maxMemoryPerPeer + } +} + +// MaxMemoryRequestor defines the maximum amount of memory the responder +// may consume queueing up messages for a response in total +func MaxMemoryRequestor(totalMaxMemory uint64) Option { + return func(gs *graphsyncConfigOptions) { + gs.totalMaxMemoryRequestor = totalMaxMemory + } +} + +// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer +// may consume queueing up messages for a response +func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option { + return func(gs *graphsyncConfigOptions) { + gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer } } @@ -112,10 +130,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, ctx, cancel := context.WithCancel(parent) gsConfig := &graphsyncConfigOptions{ - totalMaxMemory: defaultTotalMaxMemory, - maxMemoryPerPeer: defaultMaxMemoryPerPeer, - maxInProgressRequests: defaultMaxInProgressRequests, - registerDefaultValidator: true, + totalMaxMemoryResponder: defaultTotalMaxMemory, + maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer, + totalMaxMemoryRequestor: defaultTotalMaxMemory, + maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer, + maxInProgressRequests: defaultMaxInProgressRequests, + registerDefaultValidator: true, } for _, option := range options { option(gsConfig) @@ -136,12 +156,14 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, if gsConfig.registerDefaultValidator { incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) } - allocator := allocator.NewAllocator(gsConfig.totalMaxMemory, gsConfig.maxMemoryPerPeer) + responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder) createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue { - return messagequeue.New(ctx, p, network, allocator) + return messagequeue.New(ctx, p, network, responseAllocator) } peerManager := peermanager.NewMessageManager(ctx, createMessageQueue) - asyncLoader := asyncloader.New(ctx, loader, storer) + requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor) + + asyncLoader := asyncloader.New(ctx, loader, storer, requestAllocator) requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) responseAssembler := responseassembler.New(ctx, peerManager) peerTaskQueue := peertaskqueue.New() @@ -171,7 +193,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, persistenceOptions: persistenceOptions, ctx: ctx, cancel: cancel, - allocator: allocator, + allocator: responseAllocator, } asyncLoader.Startup() diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index c0111b5d..afe79999 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -7,6 +7,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" + peer "github.com/libp2p/go-libp2p-peer" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/metadata" @@ -25,6 +26,12 @@ type alternateQueue struct { loadAttemptQueue *loadattemptqueue.LoadAttemptQueue } +// Allocator indicates a mechanism for tracking memory used by a given peer +type Allocator interface { + AllocateBlockMemory(p peer.ID, amount uint64) <-chan error + ReleaseBlockMemory(p peer.ID, amount uint64) error +} + // AsyncLoader manages loading links asynchronously in as new responses // come in from the network type AsyncLoader struct { @@ -40,12 +47,13 @@ type AsyncLoader struct { alternateQueues map[string]alternateQueue responseCache *responsecache.ResponseCache loadAttemptQueue *loadattemptqueue.LoadAttemptQueue + allocator Allocator } // New initializes a new link loading manager for asynchronous loads from the given context // and local store loading and storing function -func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader { - responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer) +func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer, allocator Allocator) *AsyncLoader { + responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer, allocator) ctx, cancel := context.WithCancel(ctx) return &AsyncLoader{ ctx: ctx, @@ -59,6 +67,7 @@ func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoad alternateQueues: make(map[string]alternateQueue), responseCache: responseCache, loadAttemptQueue: loadAttemptQueue, + allocator: allocator, } } @@ -103,8 +112,16 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp // ProcessResponse injests new responses and completes asynchronous loads as // neccesary -func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, +func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { + totalMemoryAllocated := uint64(0) + for _, blk := range blks { + totalMemoryAllocated += uint64(len(blk.RawData())) + } + select { + case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated): + case <-al.ctx.Done(): + } select { case <-al.ctx.Done(): case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}: @@ -113,10 +130,10 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat // AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel // for errors -- only one message will be sent over either. -func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { +func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { resultChan := make(chan types.AsyncLoadResult, 1) response := make(chan error, 1) - lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan) + lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan) _ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response) return resultChan } @@ -258,7 +275,7 @@ func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error { if existing { return errors.New("already registerd a persistence option with this name") } - responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer) + responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer, al.allocator) al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue} return nil } @@ -347,13 +364,16 @@ func (crm *cleanupRequestMessage) handle(al *AsyncLoader) { al.responseCache.FinishRequest(crm.requestID) } -func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { +func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) { unverifiedBlockStore := unverifiedblockstore.New(storer) responseCache := responsecache.New(unverifiedBlockStore) - loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult { + loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult { // load from response cache data, err := responseCache.AttemptLoad(requestID, link) + if data != nil { + allocator.ReleaseBlockMemory(p, uint64(len(data))) + } if data == nil && err == nil { // fall back to local store stream, loadErr := loader(link, ipld.LinkContext{}) diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 8d3a41bc..13c482fe 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/allocator" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/types" "github.com/ipfs/go-graphsync/testutil" @@ -24,7 +25,8 @@ func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) { link := st.Store(t, block) withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.RequestID(rand.Int31()) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + resultChan := asyncLoader.AsyncLoad(p, requestID, link) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -46,8 +48,9 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + asyncLoader.ProcessResponse(p, responses, blocks) + resultChan := asyncLoader.AsyncLoad(p, requestID, link) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) @@ -69,9 +72,10 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, nil) + p := testutil.GeneratePeers(1)[0] + asyncLoader.ProcessResponse(p, responses, nil) - resultChan := asyncLoader.AsyncLoad(requestID, link) + resultChan := asyncLoader.AsyncLoad(p, requestID, link) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) }) @@ -82,7 +86,8 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + resultChan := asyncLoader.AsyncLoad(p, requestID, link) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -99,7 +104,8 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { requestID := graphsync.RequestID(rand.Int31()) err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + resultChan := asyncLoader.AsyncLoad(p, requestID, link) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) @@ -111,7 +117,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(p, responses, blocks) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) st.AssertBlockStored(t, block) @@ -126,7 +132,8 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { requestID := graphsync.RequestID(rand.Int31()) err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + resultChan := asyncLoader.AsyncLoad(p, requestID, link) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) @@ -138,7 +145,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, nil) + asyncLoader.ProcessResponse(p, responses, nil) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -151,7 +158,8 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { requestID := graphsync.RequestID(rand.Int31()) err := asyncLoader.StartRequest(requestID, "") require.NoError(t, err) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + resultChan := asyncLoader.AsyncLoad(p, requestID, link) st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) asyncLoader.CompleteResponsesFor(requestID) assertFailResponse(ctx, t, resultChan) @@ -174,13 +182,14 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) - resultChan := asyncLoader.AsyncLoad(requestID, link) + p := testutil.GeneratePeers(1)[0] + asyncLoader.ProcessResponse(p, responses, blocks) + resultChan := asyncLoader.AsyncLoad(p, requestID, link) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 0) - resultChan = asyncLoader.AsyncLoad(requestID, link) + resultChan = asyncLoader.AsyncLoad(p, requestID, link) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) @@ -204,7 +213,8 @@ func TestRegisterUnregister(t *testing.T) { requestID2 := graphsync.RequestID(rand.Int31()) err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) - resultChan1 := asyncLoader.AsyncLoad(requestID2, link1) + p := testutil.GeneratePeers(1)[0] + resultChan1 := asyncLoader.AsyncLoad(p, requestID2, link1) assertSuccessResponse(ctx, t, resultChan1) err = asyncLoader.UnregisterPersistenceOption("other") require.EqualError(t, err, "cannot unregister while requests are in progress") @@ -227,11 +237,13 @@ func TestRequestSplittingLoadLocallyFromBlockstore(t *testing.T) { err := asyncLoader.RegisterPersistenceOption("other", otherSt.loader, otherSt.storer) require.NoError(t, err) requestID1 := graphsync.RequestID(rand.Int31()) - resultChan1 := asyncLoader.AsyncLoad(requestID1, link) + p := testutil.GeneratePeers(1)[0] + + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) requestID2 := graphsync.RequestID(rand.Int31()) err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) - resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) assertFailResponse(ctx, t, resultChan1) assertSuccessResponse(ctx, t, resultChan2) @@ -254,8 +266,9 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { require.NoError(t, err) err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) - resultChan1 := asyncLoader.AsyncLoad(requestID1, link) - resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + p := testutil.GeneratePeers(1)[0] + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) responses := map[graphsync.RequestID]metadata.Metadata{ requestID1: metadata.Metadata{ metadata.Item{ @@ -270,7 +283,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(p, responses, blocks) assertSuccessResponse(ctx, t, resultChan1) assertSuccessResponse(ctx, t, resultChan2) @@ -294,8 +307,9 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { require.NoError(t, err) err = asyncLoader.StartRequest(requestID2, "other") require.NoError(t, err) - resultChan1 := asyncLoader.AsyncLoad(requestID1, link) - resultChan2 := asyncLoader.AsyncLoad(requestID2, link) + p := testutil.GeneratePeers(1)[0] + resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link) + resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link) responses := map[graphsync.RequestID]metadata.Metadata{ requestID2: metadata.Metadata{ metadata.Item{ @@ -304,7 +318,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(responses, blocks) + asyncLoader.ProcessResponse(p, responses, blocks) asyncLoader.CompleteResponsesFor(requestID1) assertFailResponse(ctx, t, resultChan1) @@ -370,7 +384,8 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - asyncLoader := New(ctx, st.loader, st.storer) + allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20)) + asyncLoader := New(ctx, st.loader, st.storer, allocator) asyncLoader.Startup() exec(ctx, asyncLoader) } diff --git a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go index 092cd0a3..c6c318b5 100644 --- a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go +++ b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/requestmanager/types" @@ -12,6 +13,7 @@ import ( // LoadRequest is a request to load the given link for the given request id, // with results returned to the given channel type LoadRequest struct { + p peer.ID requestID graphsync.RequestID link ipld.Link resultChan chan types.AsyncLoadResult @@ -19,15 +21,17 @@ type LoadRequest struct { // NewLoadRequest returns a new LoadRequest for the given request id, link, // and results channel -func NewLoadRequest(requestID graphsync.RequestID, +func NewLoadRequest( + p peer.ID, + requestID graphsync.RequestID, link ipld.Link, resultChan chan types.AsyncLoadResult) LoadRequest { - return LoadRequest{requestID, link, resultChan} + return LoadRequest{p, requestID, link, resultChan} } // LoadAttempter attempts to load a link to an array of bytes // and returns an async load result -type LoadAttempter func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult +type LoadAttempter func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult // LoadAttemptQueue attempts to load using the load attempter, and then can // place requests on a retry queue @@ -46,7 +50,7 @@ func New(loadAttempter LoadAttempter) *LoadAttemptQueue { // AttemptLoad attempts to loads the given load request, and if retry is true // it saves the loadrequest for retrying later func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) { - response := laq.loadAttempter(lr.requestID, lr.link) + response := laq.loadAttempter(lr.p, lr.requestID, lr.link) if response.Err != nil || response.Data != nil { lr.resultChan <- response close(lr.resultChan) diff --git a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go index 5f26a6fe..ccf076ea 100644 --- a/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go +++ b/requestmanager/asyncloader/loadattemptqueue/loadattemptqueue_test.go @@ -8,6 +8,7 @@ import ( "time" ipld "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" @@ -20,7 +21,7 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { callCount++ return types.AsyncLoadResult{ Data: testutil.RandomBytes(100), @@ -30,9 +31,10 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) { link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) + p := testutil.GeneratePeers(1)[0] resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -48,7 +50,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { callCount++ return types.AsyncLoadResult{ Err: fmt.Errorf("something went wrong"), @@ -59,7 +61,9 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(requestID, link, resultChan) + p := testutil.GeneratePeers(1)[0] + + lr := NewLoadRequest(p, requestID, link, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -74,7 +78,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() callCount := 0 - loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { var result []byte if callCount > 0 { result = testutil.RandomBytes(100) @@ -89,8 +93,10 @@ func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) { link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) + p := testutil.GeneratePeers(1)[0] + resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(requestID, link, resultChan) + lr := NewLoadRequest(p, requestID, link, resultChan) loadAttemptQueue.AttemptLoad(lr, false) var result types.AsyncLoadResult @@ -106,7 +112,7 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing defer cancel() callCount := 0 called := make(chan struct{}, 2) - loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { var result []byte called <- struct{}{} if callCount > 0 { @@ -122,7 +128,8 @@ func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(requestID, link, resultChan) + p := testutil.GeneratePeers(1)[0] + lr := NewLoadRequest(p, requestID, link, resultChan) loadAttemptQueue.AttemptLoad(lr, true) testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) @@ -141,7 +148,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { defer cancel() callCount := 0 called := make(chan struct{}, 2) - loadAttempter := func(graphsync.RequestID, ipld.Link) types.AsyncLoadResult { + loadAttempter := func(peer.ID, graphsync.RequestID, ipld.Link) types.AsyncLoadResult { var result []byte called <- struct{}{} if callCount > 0 { @@ -157,7 +164,8 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) { link := testutil.NewTestLink() requestID := graphsync.RequestID(rand.Int31()) resultChan := make(chan types.AsyncLoadResult, 1) - lr := NewLoadRequest(requestID, link, resultChan) + p := testutil.GeneratePeers(1)[0] + lr := NewLoadRequest(p, requestID, link, resultChan) loadAttemptQueue.AttemptLoad(lr, true) testutil.AssertDoesReceiveFirst(t, called, "should attempt load with no result", resultChan, ctx.Done()) diff --git a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go index 7e94fc5f..74fade97 100644 --- a/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go +++ b/requestmanager/asyncloader/unverifiedblockstore/unverifiedblockstore.go @@ -2,6 +2,7 @@ package unverifiedblockstore import ( "fmt" + logging "github.com/ipfs/go-log/v2" ipld "github.com/ipld/go-ipld-prime" ) @@ -17,7 +18,7 @@ type settableWriter interface { type UnverifiedBlockStore struct { inMemoryBlocks map[ipld.Link][]byte storer ipld.Storer - dataSize uint64 + dataSize uint64 } // New initializes a new unverified store with the given storer function for writing @@ -40,7 +41,7 @@ func (ubs *UnverifiedBlockStore) AddUnverifiedBlock(lnk ipld.Link, data []byte) // PruneBlocks removes blocks from the unverified store without committing them, // if the passed in function returns true for the given link func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) { - for link,data := range ubs.inMemoryBlocks { + for link, data := range ubs.inMemoryBlocks { if shouldPrune(link) { delete(ubs.inMemoryBlocks, link) ubs.dataSize = ubs.dataSize - uint64(len(data)) @@ -53,6 +54,7 @@ func (ubs *UnverifiedBlockStore) PruneBlocks(shouldPrune func(ipld.Link) bool) { func (ubs *UnverifiedBlockStore) PruneBlock(link ipld.Link) { delete(ubs.inMemoryBlocks, link) ubs.dataSize = ubs.dataSize - uint64(len(ubs.inMemoryBlocks[link])) + log.Debugw("pruned in-memory block", "total_queued_bytes", ubs.dataSize) } // VerifyBlock verifies the data for the given link as being part of a traversal, @@ -64,6 +66,7 @@ func (ubs *UnverifiedBlockStore) VerifyBlock(lnk ipld.Link) ([]byte, error) { } delete(ubs.inMemoryBlocks, lnk) ubs.dataSize = ubs.dataSize - uint64(len(data)) + log.Debugw("verified block", "total_queued_bytes", ubs.dataSize) buffer, committer, err := ubs.storer(ipld.LinkContext{}) if err != nil { diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index b442b290..303ec368 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -22,7 +22,7 @@ import ( // AsyncLoadFn is a function which given a request id and an ipld.Link, returns // a channel which will eventually return data for the link or an err -type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResult +type AsyncLoadFn func(peer.ID, graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResult // ExecutionEnv are request parameters that last between requests type ExecutionEnv struct { @@ -111,7 +111,7 @@ func (re *requestExecutor) traverse() error { return err } lnk, _ := traverser.CurrentRequest() - resultChan := re.env.Loader(re.request.ID(), lnk) + resultChan := re.env.Loader(re.p, re.request.ID(), lnk) var result types.AsyncLoadResult select { case result = <-resultChan: diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index a05ab9f8..8ce54d7c 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -267,7 +267,7 @@ func TestRequestExecutionBlockChain(t *testing.T) { configureLoader := data.configureLoader if configureLoader == nil { configureLoader = func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, fal *testloader.FakeAsyncLoader, startStop [2]int) { - fal.SuccessResponseOn(requestID, tbc.Blocks(startStop[0], startStop[1])) + fal.SuccessResponseOn(p, requestID, tbc.Blocks(startStop[0], startStop[1])) } } requestCtx, requestCancel := context.WithCancel(ctx) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 01a38f9f..4dfcb002 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -59,9 +59,9 @@ type PeerHandler interface { // results as new responses are processed type AsyncLoader interface { StartRequest(graphsync.RequestID, string) error - ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, + ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) - AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult + AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) CleanupRequest(requestID graphsync.RequestID) } @@ -274,16 +274,15 @@ type processResponseMessage struct { p peer.ID responses []gsmsg.GraphSyncResponse blks []blocks.Block + response chan error } // ProcessResponses ingests the given responses from the network and // and updates the in progress requests based on those responses. func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { - select { - case rm.messages <- &processResponseMessage{p, responses, blks}: - case <-rm.ctx.Done(): - } + response := make(chan error, 1) + rm.sendSyncMessage(&processResponseMessage{p, responses, blks, response}, response, nil) } type unpauseRequestMessage struct { @@ -478,8 +477,12 @@ func (prm *processResponseMessage) handle(rm *RequestManager) { filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p) rm.updateLastResponses(filteredResponses) responseMetadata := metadataForResponses(filteredResponses) - rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks) + rm.asyncLoader.ProcessResponse(prm.p, responseMetadata, prm.blks) rm.processTerminations(filteredResponses) + select { + case <-rm.ctx.Done(): + case prm.response <- nil: + } } func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 6ef5b77d..7e80270c 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -132,8 +132,8 @@ func TestNormalSimultaneousFetch(t *testing.T) { requestRecords[0].gsr.ID(): firstMetadata1, requestRecords[1].gsr.ID(): firstMetadata2, }) - td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks()) - td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3)) + td.fal.SuccessResponseOn(peers[0], requestRecords[0].gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3)) td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan1) blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3) @@ -155,7 +155,7 @@ func TestNormalSimultaneousFetch(t *testing.T) { requestRecords[1].gsr.ID(): moreMetadata, }) - td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), moreBlocks) blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3) testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1) @@ -186,8 +186,8 @@ func TestCancelRequestInProgress(t *testing.T) { td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) - td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks) - td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), firstBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[0].gsr.ID(), firstBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), firstBlocks) td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3) cancel1() rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] @@ -202,8 +202,8 @@ func TestCancelRequestInProgress(t *testing.T) { gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), } td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks) - td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), moreBlocks) - td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), moreBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[0].gsr.ID(), moreBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), moreBlocks) testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1) td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2) @@ -243,7 +243,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata), } td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) - td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks) + td.fal.SuccessResponseOn(peers[0], requestRecords[0].gsr.ID(), firstBlocks) }() timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second) @@ -286,7 +286,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata), } td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) - td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), firstBlocks) td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3) managerCancel() @@ -296,7 +296,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), } td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks) - td.fal.SuccessResponseOn(rr.gsr.ID(), moreBlocks) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), moreBlocks) testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan) testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan) } @@ -333,7 +333,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) { rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] // async loaded response responds immediately - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan) @@ -364,7 +364,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) { rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] // async loaded response responds immediately - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan) @@ -397,7 +397,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { } td.requestManager.ProcessResponses(peers[0], firstResponses, nil) for _, block := range td.blockChain.AllBlocks() { - td.fal.ResponseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")}) + td.fal.ResponseOn(peers[0], rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")}) } testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan) errs := testutil.CollectErrors(ctx, t, returnedErrorChan) @@ -631,7 +631,7 @@ func TestBlockHooks(t *testing.T) { td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{ rr.gsr.ID(): firstMetadata, }) - td.fal.SuccessResponseOn(rr.gsr.ID(), firstBlocks) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), firstBlocks) ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] receivedUpdateData, has := ur.gsr.Extension(td.extensionName1) @@ -695,7 +695,7 @@ func TestBlockHooks(t *testing.T) { td.fal.VerifyLastProcessedResponses(ctx, t, map[graphsync.RequestID]metadata.Metadata{ rr.gsr.ID(): nextMetadata, }) - td.fal.SuccessResponseOn(rr.gsr.ID(), nextBlocks) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), nextBlocks) ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] receivedUpdateData, has = ur.gsr.Extension(td.extensionName1) @@ -771,8 +771,8 @@ func TestOutgoingRequestHooks(t *testing.T) { requestRecords[0].gsr.ID(): md, requestRecords[1].gsr.ID(): md, }) - td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), td.blockChain.AllBlocks()) - td.fal.SuccessResponseOn(requestRecords[1].gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], requestRecords[0].gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), td.blockChain.AllBlocks()) td.blockChain.VerifyWholeChainWithTypes(requestCtx, returnedResponseChan1) td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2) @@ -824,7 +824,7 @@ func TestPauseResume(t *testing.T) { }), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) // attempt to unpause while request is not paused (note: hook on second block will keep it from // reaching pause point) @@ -865,7 +865,7 @@ func TestPauseResume(t *testing.T) { // process responses td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt)) - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) // verify the correct results are returned, picking up after where there request was paused td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1) @@ -910,7 +910,7 @@ func TestPauseResumeExternal(t *testing.T) { }), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) // verify responses sent read ONLY for blocks BEFORE the pause td.blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, pauseAt-1) // wait for the pause to occur @@ -945,7 +945,7 @@ func TestPauseResumeExternal(t *testing.T) { // process responses td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.RemainderBlocks(pauseAt)) - td.fal.SuccessResponseOn(rr.gsr.ID(), td.blockChain.AllBlocks()) + td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) // verify the correct results are returned, picking up after where there request was paused td.blockChain.VerifyRemainder(ctx, returnedResponseChan, pauseAt-1) diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index 34813dfd..4a9c8100 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -8,6 +8,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + peer "github.com/libp2p/go-libp2p-peer" "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" @@ -17,6 +18,7 @@ import ( ) type requestKey struct { + p peer.ID requestID graphsync.RequestID link ipld.Link } @@ -44,8 +46,8 @@ type FakeAsyncLoader struct { func NewFakeAsyncLoader() *FakeAsyncLoader { return &FakeAsyncLoader{ responseChannels: make(map[requestKey]chan types.AsyncLoadResult), - responses: make(chan map[graphsync.RequestID]metadata.Metadata, 1), - blks: make(chan []blocks.Block, 1), + responses: make(chan map[graphsync.RequestID]metadata.Metadata, 10), + blks: make(chan []blocks.Block, 10), storesRequested: make(map[storeKey]struct{}), } } @@ -59,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str } // ProcessResponse just records values passed to verify expectations later -func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, +func (fal *FakeAsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { fal.responses <- responses fal.blks <- blks @@ -100,12 +102,12 @@ func (fal *FakeAsyncLoader) VerifyStoreUsed(t *testing.T, requestID graphsync.Re fal.storesRequestedLk.RUnlock() } -func (fal *FakeAsyncLoader) asyncLoad(requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult { +func (fal *FakeAsyncLoader) asyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) chan types.AsyncLoadResult { fal.responseChannelsLk.Lock() - responseChannel, ok := fal.responseChannels[requestKey{requestID, link}] + responseChannel, ok := fal.responseChannels[requestKey{p, requestID, link}] if !ok { responseChannel = make(chan types.AsyncLoadResult, 1) - fal.responseChannels[requestKey{requestID, link}] = responseChannel + fal.responseChannels[requestKey{p, requestID, link}] = responseChannel } fal.responseChannelsLk.Unlock() return responseChannel @@ -117,8 +119,8 @@ func (fal *FakeAsyncLoader) OnAsyncLoad(cb func(graphsync.RequestID, ipld.Link, } // AsyncLoad simulates an asynchronous load with responses stubbed by ResponseOn & SuccessResponseOn -func (fal *FakeAsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { - res := fal.asyncLoad(requestID, link) +func (fal *FakeAsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult { + res := fal.asyncLoad(p, requestID, link) if fal.cb != nil { fal.cb(requestID, link, res) } @@ -143,16 +145,16 @@ func (fal *FakeAsyncLoader) CleanupRequest(requestID graphsync.RequestID) { // ResponseOn sets the value returned when the given link is loaded for the given request. Because it's an // "asynchronous" load, this can be called AFTER the attempt to load this link -- and the client will only get // the response at that point -func (fal *FakeAsyncLoader) ResponseOn(requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) { - responseChannel := fal.asyncLoad(requestID, link) +func (fal *FakeAsyncLoader) ResponseOn(p peer.ID, requestID graphsync.RequestID, link ipld.Link, result types.AsyncLoadResult) { + responseChannel := fal.asyncLoad(p, requestID, link) responseChannel <- result close(responseChannel) } // SuccessResponseOn is convenience function for setting several asynchronous responses at once as all successes // and returning the given blocks -func (fal *FakeAsyncLoader) SuccessResponseOn(requestID graphsync.RequestID, blks []blocks.Block) { +func (fal *FakeAsyncLoader) SuccessResponseOn(p peer.ID, requestID graphsync.RequestID, blks []blocks.Block) { for _, block := range blks { - fal.ResponseOn(requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Local: false, Err: nil}) + fal.ResponseOn(p, requestID, cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: block.RawData(), Local: false, Err: nil}) } }