-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compactor changes for building per user index files in boltdb shipper #5026
compactor changes for building per user index files in boltdb shipper #5026
Conversation
@@ -113,27 +120,33 @@ Outer: | |||
return globalRetention | |||
} | |||
|
|||
func findSmallestRetentionPeriod(limits Limits) time.Duration { | |||
func findLatestRetentionStartTime(now model.Time, limits Limits) (model.Time, map[string]model.Time) { | |||
// find the smallest retention period from default limits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it could be a function comment instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to explain the block below that comment. I will add a function comment as well.
@@ -14,11 +14,17 @@ const delimiter = "/" | |||
|
|||
// Client is used to manage boltdb index files in object storage, when using boltdb-shipper. | |||
type Client interface { | |||
ListTables(ctx context.Context) ([]string, error) | |||
ListFiles(ctx context.Context, tableName string) ([]IndexFile, error) | |||
ListFiles(ctx context.Context, tableName string) ([]IndexFile, []string, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you may be compose this interface with the two interface at the end, and also move then at the top ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah, that was the plan and I missed doing that. Thanks for pointing it out!
|
||
type indexSet struct { | ||
client Client | ||
userIndex bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
userBasedIndex ?
if len(is.sourceObjects) > 0 { | ||
// we would only have compacted files in user index folder, so it is not expected to have -1 for seedFileIdx but | ||
// let's still check it as a safety mechanism to avoid panics. | ||
if seedFileIdx != -1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be == ?
return | ||
} | ||
|
||
// mustRecreateCompactedDB returns true if the compacted db should be recreated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it should be used by the index set instance as you rightly pointed out below.
} | ||
|
||
// recreateCompactedDB just copies the old db to the new one using bbolt.Compact for following reasons: | ||
// 1. When files are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 1. When files are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file. | |
// 1. When index entries are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file. |
// - upload the compacted db if required. | ||
// - remove the source objects from storage if required. | ||
func (is *indexSet) done() error { | ||
if !is.uploadCompactedDB && !is.removeSourceObjects && mustRecreateCompactedDB(is.sourceObjects) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be it was supposed to be is.mustRecreateCompactedDB
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage is correct here, I was supposed to drop the mustRecreateCompactedDB
method on the index set.
if err != nil { | ||
return err | ||
} | ||
|
||
defer func() { | ||
if err := readCloser.Close(); err != nil { | ||
level.Error(util_log.Logger) | ||
level.Error(util_log.Logger).Log("msg", "failed to close read closer", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Error(util_log.Logger).Log("msg", "failed to close read closer", "err", err) | |
level.Error(logger).Log("msg", "failed to close read closer", "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also use logger
in the defer. below
return strings.HasSuffix(filename, ".gz") | ||
} | ||
|
||
func DoConcurrentWork(ctx context.Context, maxConcurrency, workCount int, logger log.Logger, do func(workNum int) error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use https://github.com/grafana/dskit/blob/main/concurrency/runner.go#L65 instead.
jobs []interface{} = [0,1,2,3,....,workCount]int
} | ||
table.logger = log.With(util_log.Logger, "table-name", table.name) | ||
|
||
return &table, nil | ||
} | ||
|
||
func (t *table) compact(tableHasExpiredStreams bool) error { | ||
indexFiles, err := t.indexStorageClient.ListFiles(t.ctx, t.name) | ||
func (t *table) compact(applyRetention bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate the logic for multi tenant index, this function is quite hard to understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did give it a try but it looks hard. I feel the complexity comes from the support for backwards compatibility and multiple index formats. I will see if I can do something or maybe we can discuss this in a call face to face.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM it's a bit difficult to follow.
With the PR description I was able to understand everything though. Although we might want to think about refactoring to make the code cleaner, single code path for each case and separate concern. Talking specifically about the compaction/retention code.
I did some thinking and it does not seem as easy to build a separate execution path without making it complex. I am going to merge this PR and open a separate PR if I can come up with something.
Thanks for quickly reviewing this PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM it's a bit difficult to follow.
With the PR description I was able to understand everything though. Although we might want to think about refactoring to make the code cleaner, single code path for each case and separate concern. Talking specifically about the compaction/retention code.
…user-index-compactor-changes
…user-index-compactor-changes
What this PR does / why we need it:
This is the first PR for adding support for per user index files in boltdb shipper. It includes changes only related to the compactor.
The below image shows various formats that would be in use.
Roughly here is how it would work:
Format1
. Switching to per user index i.eFormat3
would be controlled by a feature flag.Format2
which is to avoid building too many files in a large cluster with thousands of tenants.index
, it would write index inFormat1
.index
then it would write index inFormat3
.Special notes for your reviewer:
I am doing smaller PRs to avoid making it difficult to review a huge PR at once.
Checklist