-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Back pressure incoming responses #204
Back pressure incoming responses #204
Conversation
Block reading incoming responses to avoid memory pressure buildup
// load from response cache | ||
data, err := responseCache.AttemptLoad(requestID, link) | ||
if data != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious if we intentionally checking this before checking err
? If so, would it make sense to add a comment here to document the rationale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if data != nil { | |
if err != nil { | |
return types.AsyncLoadResult{Err: err} | |
} | |
if data != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor stuff, and my comments concerning context [ab]use are probably out of scope for this PR.
blks []blocks.Block) { | ||
totalMemoryAllocated := uint64(0) | ||
for _, blk := range blks { | ||
totalMemoryAllocated += uint64(len(blk.RawData())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Avoid copying each block into blk
by using index instead:
for i := range blks {
totalMemoryAllocated += uint64(len(blks[i].RawData()))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious -- why?
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader { | ||
responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer) | ||
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer, allocator Allocator) *AsyncLoader { | ||
responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer, allocator) | ||
ctx, cancel := context.WithCancel(ctx) | ||
return &AsyncLoader{ | ||
ctx: ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we stop abusing contexts. Maybe a follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a much larger issue. See comment below.
} | ||
select { | ||
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated): | ||
case <-al.ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably out of scope for this review, but ctx
should be passed into function as argument, not kept inside the data structure. If it is necessary to wait on something to detect shutdown, then use a channel. Also, is there a real concern that AllocateBlockMemory
will not return and that we need to stop waiting for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a difference here: ctx is the overall shutdown for the whole module. It's passed down from the ctx used to construct graphsync. Essentially we're using context to manage shutting down the whole module when we're done.
as for the larger concern re: abusing contexts, the coding style of graphsync is adopted from go-bitswap, which is generally follows patterns from go-ipfs which was developed pretty early on in golang's development. I think at the time it was "use contexts for manging shutdowns".
I totally do not love this way of managing shutdowns. In fact, I would love to do something much more structured like a supervision tree. I've brought this up several times during my several years at PL but it never seems to get enough traction to do anything.
Graphsync does distinguish frequently between module context (the one here) vs request context (used elsewhere, usually passed into the function)
I'd love to have this as a larger discussion but it's definitely outside the scope of this PR
// load from response cache | ||
data, err := responseCache.AttemptLoad(requestID, link) | ||
if data != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if data != nil { | |
if err != nil { | |
return types.AsyncLoadResult{Err: err} | |
} | |
if data != nil { |
responses: make(chan map[graphsync.RequestID]metadata.Metadata, 1), | ||
blks: make(chan []blocks.Block, 1), | ||
responses: make(chan map[graphsync.RequestID]metadata.Metadata, 10), | ||
blks: make(chan []blocks.Block, 10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why buffer of 10? Is there a need for a specific number of slots?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh this is just some test hackiness to avoid a channel block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically, we need to be able to absorb a bunch of responses without blocking now
rm.processTerminations(filteredResponses) | ||
select { | ||
case <-rm.ctx.Done(): | ||
case prm.response <- nil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this channel is guaranteed to be buffered, and only one response is written to it, then is there any chance that it would block here? If no chance to block, then no need to wait for rm.ctx.Done()
. Also, consider closing the channel instead if there will not be any other writers, as that will also not require handling blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a habit. I always assume a channel could block. I guess defense programming
@@ -274,16 +274,15 @@ type processResponseMessage struct { | |||
p peer.ID | |||
responses []gsmsg.GraphSyncResponse | |||
blks []blocks.Block | |||
response chan error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be constrained to write-only: response chan<- error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, probably. But the others aren't at the moment so I'd rather main the consistency for now
@@ -370,7 +384,8 @@ func withLoader(st *store, exec func(ctx context.Context, asyncLoader *AsyncLoad | |||
ctx := context.Background() | |||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) | |||
defer cancel() | |||
asyncLoader := New(ctx, st.loader, st.storer) | |||
allocator := allocator.NewAllocator(256*(1<<20), 16*(1<<20)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though this is test code, it would be nice to define these with const values
@gammazero I'm recording most of your comments as useful suggestions worth of deeper consideration, cause they actually should be changed across the codebase. I'm not absorbing them now as this fix is tied to a fairly immediate need to ship a release branch in Lotus. It is nice to get other folks looking at this code base and giving me this kind of input. |
I've added a bunch of testground improvements to this PR -- these were done to verify experimentally that this actually resolves the memory backpressure problem. These are some mid test memory dumps running the You'll see that there's a large 1GB backup in the before and not in the after. This is the exact same backup witnessed in filecoin-project/lotus#7077 |
…e graphsync message (#204) * feat: use different extension names to fit multiple payloads in the same message * remove logs in test * add comments, update tests and loop over extension names * add default extension name for each hook * add comment * simplify extension names loop * trigger OnResponseReceived for multiple extensions * use processExtension + use var instead of prop for ext names Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
Goals
Prevent out of memory errors due to slow processing of requests or missing block traversals
Implementation