-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KV scan batch size control #4875
Conversation
c7faa13
to
fd09890
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great change, looks great.
Added some minor comments
Requesting changes only because that the DynamoDB scan doesn't choose the scan size correctly
@@ -60,6 +60,11 @@ type ValueWithPredicate struct { | |||
Predicate Predicate | |||
} | |||
|
|||
type ScanOptions struct { | |||
KeyStart []byte | |||
BatchSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add some documentation here explaining what BatchSize represents
pkg/kv/store.go
Outdated
@@ -79,7 +84,9 @@ type Store interface { | |||
|
|||
// Scan returns entries that can be read by key order, starting at or after the `start` position |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Scan returns entries that can be read by key order, starting at or after the `start` position | |
// Scan returns entries that can be read by key order |
pkg/kv/store.go
Outdated
@@ -79,7 +84,9 @@ type Store interface { | |||
|
|||
// Scan returns entries that can be read by key order, starting at or after the `start` position | |||
// partitionKey is optional, passing it might increase performance. | |||
Scan(ctx context.Context, partitionKey, start []byte) (EntriesIterator, error) | |||
// 'options' holds additional parameters to limit the number of records | |||
// and set the prefix to scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it set the prefix to scan or the start key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, fixing the comment.
pkg/graveler/graveler.go
Outdated
@@ -1872,7 +1873,7 @@ func (g *Graveler) addCommitNoLock(ctx context.Context, repository *RepositoryRe | |||
} | |||
|
|||
func (g *Graveler) isStagingEmpty(ctx context.Context, repository *RepositoryRecord, branch *Branch) (bool, error) { | |||
itr, err := g.listStagingArea(ctx, branch) | |||
itr, err := g.listStagingArea(ctx, branch, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itr, err := g.listStagingArea(ctx, branch, 0) | |
itr, err := g.listStagingArea(ctx, branch, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - good call to optimize empty checks too
pkg/graveler/graveler.go
Outdated
@@ -1904,7 +1905,7 @@ func (g *Graveler) isSealedEmpty(ctx context.Context, repository *RepositoryReco | |||
if len(branch.SealedTokens) == 0 { | |||
return true, nil | |||
} | |||
itrs, err := g.sealedTokensIterator(ctx, branch) | |||
itrs, err := g.sealedTokensIterator(ctx, branch, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
itrs, err := g.sealedTokensIterator(ctx, branch, 0) | |
itrs, err := g.sealedTokensIterator(ctx, branch, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - good call to optimize empty checks too
pkg/graveler/staging/manager.go
Outdated
@@ -94,8 +94,8 @@ func (m *Manager) DropKey(ctx context.Context, st graveler.StagingToken, key gra | |||
|
|||
// List TODO niro: Remove batchSize parameter post KV |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we can remove this todo
if opts.PrefetchSize > 0 { | ||
opts.PrefetchValues = true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
pkg/kv/dynamodb/store.go
Outdated
if s.params.ScanLimit != 0 { | ||
queryInput.SetLimit(s.params.ScanLimit) | ||
} | ||
if options.BatchSize != 0 { | ||
queryInput.Limit = aws.Int64(int64(options.BatchSize)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should choose the minimum between ScanLimit and batchSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing
Close #4712
Pass listing in catalog level to the KV driver level.
Listing works with paginations, while KV with iterators.
In this PR the caller can pass recommended batch size to the KV level so the implementation can utilize our resources better and perform faster for specific actions.