-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
batch hot paths for a very short duration #1618
Conversation
Think we can we can simplify it a bit more while using |
@nopcoder sounds nice! Feel free to give that a go 🙂 |
Used the same env to test your branch - behaves just the same (~45k requests/second). I agree your implementation is simpler. |
pkg/batch/executor.go
Outdated
keys: make(map[string][]*request), | ||
logger: logger, | ||
} | ||
go e.Run() // TODO(ozkatz): should probably be managed by the user (also, allow stopping it) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can move this one into Run() with defer close, so we will not require to add Stop() and/or Close() methods to handle this resource.
// see if we have it scheduled already | ||
if _, exists := e.keys[req.key]; !exists { | ||
// this is a new key, let's fire a timer for it | ||
go func(req *request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Add WaitGroup that will count ongoing goroutines.
- Add Close() method to Executor to wait for the wait group done.
- Close() should also train execs and call the responseCallback(s)
// let's take all callbacks | ||
waiters := e.keys[execKey] | ||
delete(e.keys, execKey) | ||
go func(key string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass waiters as you pass the key - just to be symmetric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or don't pass both and pin instead..
delete(e.keys, execKey) | ||
go func(key string) { | ||
// execute and call all mapped callbacks | ||
v, err := waiters[0].fn() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: will probably will like to capture this one inside a func with recover that will return an error
Codecov Report
@@ Coverage Diff @@
## master #1618 +/- ##
==========================================
+ Coverage 39.25% 39.41% +0.16%
==========================================
Files 167 168 +1
Lines 13563 13621 +58
==========================================
+ Hits 5324 5369 +45
- Misses 7474 7487 +13
Partials 765 765
Continue to review full report at Codecov.
|
@itaiad200 please see the tests I added: I attempted to prove this method does not violate read-after-write consistency |
// let's take all callbacks | ||
waiters := e.keys[execKey] | ||
delete(e.keys, execKey) | ||
go func(key string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or don't pass both and pin instead..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Most requested is the change to the test, to ensure reader1 actually starts waiting after writer1 writes.
responseCallback chan *response | ||
} | ||
|
||
type Executor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An Executor is a Batcher, which is a somewhat odd usage of the interface name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm bad at naming, I'll admit to that. Suggestions are welcome :)
delayFn := func(dur time.Duration) { | ||
delaysDone := atomic.AddInt32(&delays, 1) | ||
if delaysDone == 1 { | ||
close(waitWrite) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the write can occur before https://github.com/treeverse/lakeFS/pull/1618/files#diff-c9e7aae146c0798d32ade9be6fa5013612246e323b7a2796dbcdc83a0151c607R82 ever happens (because the scheduler is evil). I think you may need to wait on another channel here, that writer1 will close after it does write.
Looking at the access pattern for critical path operations, we mostly call PostgreSQL with the same exact queries many times.
For GetObject, StatObject, ListObjects (which probably make up the majority of data lake calls), we ALWAYS start by doing the same set of roundtrips to PG:
Our access pattern is such that many requests at a given point in time are extremely likely to not only share the same repository details, but also the same branch/commit/tag, as big data systems tend to be bursty in nature.
Since caching is not an option since it sacrifices consistency (in the sense that reading after a successful write returns - might return a stale value), instead of keeping the result around for a while, we can keep the requests around for a while.
This is what this PR does: for a given type of request (i.e. to a specific branch/repo/tag, etc), wait a couple of milliseconds: if other identical requests arive in that time, do a single roundtrip and return the results once for all those requests.
Testing this on the same environment used for the sizing guide (2 x c5ad.xlarge AWS instances), I now get the following results:
I'm OK with not accepting this due to it being a premature optimization (it is!), but I feel the added complexity is relatively small and the gain is pretty big (if only to show better numbers per core as possible).