Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a flaky test in boltdb shipper #2550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func (lt *Table) Upload(ctx context.Context, force bool) error {
lt.dbsMtx.RLock()
defer lt.dbsMtx.RUnlock()

// upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk.
// To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive.
uploadShardsBefore := fmt.Sprint(time.Now().Add(-time.Minute).Truncate(shardDBsByDuration).Unix())
uploadShardsBefore := fmt.Sprint(getOldestActiveShardTime().Unix())

// Adding check for considering only files which are sharded and have just an epoch in their name.
// Before introducing sharding we had a single file per table which were were moved inside the folder per table as part of migration.
Expand Down Expand Up @@ -379,3 +377,10 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {

return dbs, nil
}

// getOldestActiveShardTime returns the time of oldest active shard with a buffer of 1 minute.
func getOldestActiveShardTime() time.Time {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admittedly I'm not that familiar with this code, but I'm having a bit of difficulty following. Wouldn't it make more sense as func getOldestActiveShardTime(from time.Time) time.Time, which could then be passed time.Now() as an argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually always have 1 active shard at a time but we consider last active shard as still active if it was active a minute back. This is to avoid any race conditions. I don't mind changing the function to accept a time.Time as a parameter but it would always be getting time.Now as input. Do you want me to change it?

// upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk.
// To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive.
return time.Now().Add(-time.Minute).Truncate(shardDBsByDuration)
}
9 changes: 5 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,14 @@ func TestTable_ImmutableUploads(t *testing.T) {
boltDBIndexClient.Stop()
}()

activeShard := time.Now().Truncate(shardDBsByDuration)
// shardCutoff is calulated based on when shards are considered to not be active anymore and are safe to be uploaded.
shardCutoff := getOldestActiveShardTime()

// some dbs to setup
dbNames := []int64{
activeShard.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload
activeShard.Add(-2 * time.Minute).Unix(), // 2 minutes before active shard, should upload
activeShard.Unix(), // active shard, should not upload
shardCutoff.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload
shardCutoff.Add(-1 * time.Minute).Unix(), // 1 minute before shard cutoff, should upload
time.Now().Truncate(shardDBsByDuration).Unix(), // active shard, should not upload
}

dbs := map[string]testutil.DBRecords{}
Expand Down