Skip to content

Commit

Permalink
compactor changes for building per user index files in boltdb shipper (
Browse files Browse the repository at this point in the history
…#5026)

* compactor changes for building per user index files in boltdb shipper

* update comments

* lint and changes suggested from PR review

* fix broken code after merging main
  • Loading branch information
sandeepsukhani authored Jan 7, 2022
1 parent a5c544e commit ff47d74
Show file tree
Hide file tree
Showing 25 changed files with 1,500 additions and 667 deletions.
11 changes: 7 additions & 4 deletions pkg/storage/chunk/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

var (
bucketName = []byte("index")
defaultBucketName = []byte("index")
ErrUnexistentBoltDB = errors.New("boltdb file does not exist")
)

Expand Down Expand Up @@ -171,9 +171,12 @@ func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) {
return db, nil
}

func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, writes TableWrites) error {
func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes TableWrites) error {
return db.Update(func(tx *bbolt.Tx) error {
var b *bbolt.Bucket
if len(bucketName) == 0 {
bucketName = defaultBucketName
}

// a bucket should already exist for deletes, for other writes we create one otherwise.
if len(writes.deletes) != 0 {
Expand Down Expand Up @@ -212,7 +215,7 @@ func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch
return err
}

err = b.WriteToDB(ctx, db, writes)
err = b.WriteToDB(ctx, db, nil, writes)
if err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +243,7 @@ func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, cal

func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
return db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
bucket := tx.Bucket(defaultBucketName)
if bucket == nil {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/chunk/local/boltdb_index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func setupDB(t *testing.T, boltdbIndexClient *BoltIndexClient, dbname string) {
require.NoError(t, err)

err = db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucketName)
b, err := tx.CreateBucketIfNotExists(defaultBucketName)
if err != nil {
return err
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestBoltDBReload(t *testing.T) {

valueFromDb := []byte{}
_ = droppedDb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketName)
b := tx.Bucket(defaultBucketName)
valueFromDb = b.Get(testKey)
return nil
})
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestBoltDB_Writes(t *testing.T) {
{
name: "deletes without initial writes",
testDeletes: []string{"1", "2"},
err: fmt.Errorf("bucket %s not found in table 3", bucketName),
err: fmt.Errorf("bucket %s not found in table 3", defaultBucketName),
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ func (c *Compactor) stopping(_ error) error {
}

func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error {
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker)
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient,
c.tableMarker, c.expirationChecker)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
return err
Expand All @@ -408,7 +409,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRet
interval := retention.ExtractIntervalFromTableName(tableName)
intervalMayHaveExpiredChunks := false
if c.cfg.RetentionEnabled && applyRetention {
intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval)
intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval, "")
}

err = table.compact(intervalMayHaveExpiredChunks)
Expand Down Expand Up @@ -548,8 +549,8 @@ func (e *expirationChecker) MarkPhaseFinished() {
e.deletionExpiryChecker.MarkPhaseFinished()
}

func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval) bool {
return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval)
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval, "") || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval, "")
}

func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func TestCompactor_RunCompaction(t *testing.T) {
}

for name, dbs := range tables {
testutil.SetupDBTablesAtPath(t, name, tablesPath, dbs, false)
testutil.SetupDBsAtPath(t, name, tablesPath, dbs, false, nil)

// setup exact same copy of dbs for comparison.
testutil.SetupDBTablesAtPath(t, name, tablesCopyPath, dbs, false)
testutil.SetupDBsAtPath(t, name, tablesCopyPath, dbs, false, nil)
}

compactor := setupTestCompactor(t, tempDir)
Expand All @@ -106,6 +106,6 @@ func TestCompactor_RunCompaction(t *testing.T) {
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))

// verify we have all the kvs in compacted db which were there in source dbs.
compareCompactedDB(t, filepath.Join(tablesPath, name, files[0].Name()), filepath.Join(tablesCopyPath, name))
compareCompactedTable(t, filepath.Join(tablesPath, name), filepath.Join(tablesCopyPath, name))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,20 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}
}

func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval) bool {
func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()

if userID != "" {
for _, deleteRequest := range d.deleteRequestsToProcess {
if deleteRequest.UserID == userID {
return true
}
}

return false
}

// If your request includes just today and there are chunks spanning today and yesterday then
// with previous check it won’t process yesterday’s index.
return len(d.deleteRequestsToProcess) != 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -65,7 +66,9 @@ func (t *deleteRequestsTable) init() error {

_, err := os.Stat(t.dbPath)
if err != nil {
err = shipper_util.GetFileFromStorage(context.Background(), t.indexStorageClient, DeleteRequestsTableName, deleteRequestsIndexFileName, t.dbPath, true)
err = shipper_util.DownloadFileFromStorage(func() (io.ReadCloser, error) {
return t.indexStorageClient.GetFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName)
}, shipper_util.IsCompressedFile(deleteRequestsIndexFileName), t.dbPath, true, util_log.Logger)
if err != nil && !t.indexStorageClient.IsFileNotFoundErr(err) {
return err
}
Expand Down Expand Up @@ -169,7 +172,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB
}

for _, tableWrites := range boltWriteBatch.Writes {
if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, tableWrites); err != nil {
if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, nil, tableWrites); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit ff47d74

Please sign in to comment.