Skip to content

Commit

Permalink
feat!(requestmanager): remove request allocation backpressure (#272)
Browse files Browse the repository at this point in the history
* feat!(requestmanager): remove request allocation backpressure

Closes: #241
Ref: 9171ce6

* fixup! feat!(requestmanager): remove request allocation backpressure

Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
  • Loading branch information
rvagg and hannahhoward committed Nov 30, 2021
1 parent 7bdc18b commit ed50782
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 77 deletions.
36 changes: 1 addition & 35 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,11 @@ type GraphSync struct {
ctx context.Context
cancel context.CancelFunc
responseAllocator *allocator.Allocator
requestAllocator *allocator.Allocator
}

type graphsyncConfigOptions struct {
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressIncomingRequests uint64
maxInProgressIncomingRequestsPerPeer uint64
maxInProgressOutgoingRequests uint64
Expand Down Expand Up @@ -116,22 +113,6 @@ func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
}
}

// MaxMemoryRequestor defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemoryRequestor = totalMaxMemory
}
}

// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
}
}

// MaxInProgressIncomingRequests changes the maximum number of
// incoming graphsync requests that are processed in parallel (default 6)
func MaxInProgressIncomingRequests(maxInProgressIncomingRequests uint64) Option {
Expand Down Expand Up @@ -214,8 +195,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
gsConfig := &graphsyncConfigOptions{
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
Expand Down Expand Up @@ -247,9 +226,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
asyncLoader := asyncloader.New(ctx, linkSystem)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, outgoingRequestProcessingListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
Expand Down Expand Up @@ -313,7 +291,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx: ctx,
cancel: cancel,
responseAllocator: responseAllocator,
requestAllocator: requestAllocator,
}

requestManager.SetDelegate(peerManager)
Expand Down Expand Up @@ -453,7 +430,6 @@ func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.Requ
// Stats produces insight on the current state of a graphsync exchange
func (gs *GraphSync) Stats() graphsync.Stats {
outgoingRequestStats := gs.requestQueue.Stats()
incomingResponseStats := gs.requestAllocator.Stats()

ptqstats := gs.peerTaskQueue.Stats()
incomingRequestStats := graphsync.RequestStats{
Expand All @@ -465,8 +441,6 @@ func (gs *GraphSync) Stats() graphsync.Stats {

return graphsync.Stats{
OutgoingRequests: outgoingRequestStats,
IncomingResponses: incomingResponseStats,

IncomingRequests: incomingRequestStats,
OutgoingResponses: outgoingResponseStats,
}
Expand All @@ -485,14 +459,6 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
totalMemoryAllocated := uint64(0)
for _, blk := range incoming.Blocks() {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-gsr.graphSync().requestAllocator.AllocateBlockMemory(sender, totalMemoryAllocated):
case <-gsr.ctx.Done():
}
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

Expand Down
34 changes: 7 additions & 27 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -20,24 +19,16 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/types"
)

var log = logging.Logger("gs-asyncloader")

type alternateQueue struct {
responseCache *responsecache.ResponseCache
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
ctx context.Context
cancel context.CancelFunc
allocator Allocator
ctx context.Context
cancel context.CancelFunc

// this mutex protects access to the state of the async loader, which covers all data fields below below
stateLk sync.Mutex
Expand All @@ -50,8 +41,8 @@ type AsyncLoader struct {

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem, allocator)
func New(ctx context.Context, linkSystem ipld.LinkSystem) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(linkSystem)
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
Expand All @@ -61,7 +52,6 @@ func New(ctx context.Context, linkSystem ipld.LinkSystem, allocator Allocator) *
alternateQueues: make(map[string]alternateQueue),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
allocator: allocator,
}
}

Expand All @@ -73,7 +63,7 @@ func (al *AsyncLoader) RegisterPersistenceOption(name string, lsys ipld.LinkSyst
if existing {
return errors.New("already registerd a persistence option with this name")
}
responseCache, loadAttemptQueue := setupAttemptQueue(lsys, al.allocator)
responseCache, loadAttemptQueue := setupAttemptQueue(lsys)
al.alternateQueues[name] = alternateQueue{responseCache, loadAttemptQueue}
return nil
}
Expand Down Expand Up @@ -170,13 +160,7 @@ func (al *AsyncLoader) CleanupRequest(p peer.ID, requestID graphsync.RequestID)
responseCache = al.alternateQueues[aq].responseCache
delete(al.requestQueues, requestID)
}
toFree := responseCache.FinishRequest(requestID)
if toFree > 0 {
err := al.allocator.ReleaseBlockMemory(p, toFree)
if err != nil {
log.Infow("Error deallocating requestor memory", "p", p, "toFree", toFree, "err", err)
}
}
responseCache.FinishRequest(requestID)
}

func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
Expand All @@ -193,7 +177,7 @@ func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCac
return al.alternateQueues[queue].responseCache
}

func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
func setupAttemptQueue(lsys ipld.LinkSystem) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(lsys.StorageWriteOpener)
responseCache := responsecache.New(unverifiedBlockStore)
Expand All @@ -204,10 +188,6 @@ func setupAttemptQueue(lsys ipld.LinkSystem, allocator Allocator) (*responsecach
return types.AsyncLoadResult{Err: err, Local: false}
}
if data != nil {
err = allocator.ReleaseBlockMemory(p, uint64(len(data)))
if err != nil {
log.Warningf("releasing block memory: %s", err.Error())
}
return types.AsyncLoadResult{Data: data, Local: false}
}
// fall back to local store
Expand Down
4 changes: 1 addition & 3 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/allocator"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/testutil"
Expand Down Expand Up @@ -385,8 +384,7 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20))
asyncLoader := New(ctx, st.lsys, allocator)
asyncLoader := New(ctx, st.lsys)
exec(ctx, asyncLoader)
}

Expand Down
10 changes: 2 additions & 8 deletions requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,14 @@ func New(unverifiedBlockStore UnverifiedBlockStore) *ResponseCache {
// FinishRequest indicate there is no more need to track blocks tied to this
// response. It returns the total number of bytes in blocks that were being
// tracked but are no longer in memory
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) uint64 {
func (rc *ResponseCache) FinishRequest(requestID graphsync.RequestID) {
rc.responseCacheLk.Lock()
rc.linkTracker.FinishRequest(requestID)

toFree := uint64(0)
rc.unverifiedBlockStore.PruneBlocks(func(link ipld.Link, amt uint64) bool {
shouldPrune := rc.linkTracker.BlockRefCount(link) == 0
if shouldPrune {
toFree += amt
}
return shouldPrune
return rc.linkTracker.BlockRefCount(link) == 0
})
rc.responseCacheLk.Unlock()
return toFree
}

// AttemptLoad attempts to laod the given block from the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,13 @@ func TestResponseCacheManagingLinks(t *testing.T) {
require.NoError(t, err)
require.Nil(t, data, "no data should be returned for unknown block")

toFree := responseCache.FinishRequest(requestID1)
responseCache.FinishRequest(requestID1)
// should remove only block 0, since it now has no refering outstanding requests
require.Len(t, fubs.blocks(), len(blks)-4, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[0])
require.Equal(t, toFree, uint64(len(blks[0].RawData())))

responseCache.FinishRequest(requestID2)
// should remove last block since are no remaining references
require.Len(t, fubs.blocks(), 0, "should prune block when it is orphaned")
testutil.RefuteContainsBlock(t, fubs.blocks(), blks[3])
require.Equal(t, toFree, uint64(len(blks[3].RawData())))

}

0 comments on commit ed50782

Please sign in to comment.