Skip to content
2 changes: 1 addition & 1 deletion pkg/chunk/purger/blocks_purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (api *BlocksPurgerAPI) GetAllDeleteRequestsHandler(w http.ResponseWriter, r
return
}
tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, api.logger)
deleteRequests, err := tManager.GetAllDeleteRequestsForUser(ctx)
deleteRequests, err := tManager.GetAllDeleteRequestsForUser(ctx, nil)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete requests from the block store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
8 changes: 8 additions & 0 deletions pkg/chunk/purger/tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ type TombstonesSet struct {
oldestTombstoneStart, newestTombstoneEnd model.Time // Used as optimization to find whether we want to iterate over tombstones or not
}

func NewTombstoneSet(t []DeleteRequest, start model.Time, end model.Time) *TombstonesSet {
return &TombstonesSet{
tombstones: t,
oldestTombstoneStart: start,
newestTombstoneEnd: end,
}
}

// Used for easier injection of mocks.
type DeleteStoreAPI interface {
getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error)
Expand Down
6 changes: 4 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type BlocksCleaner struct {
logger log.Logger
bucketClient objstore.Bucket
usersScanner *cortex_tsdb.UsersScanner
bucketCfg cortex_tsdb.BucketStoreConfig

// Keep track of the last owned users.
lastOwnedUsers []string
Expand All @@ -58,10 +59,11 @@ type BlocksCleaner struct {
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, bktCfg cortex_tsdb.BucketStoreConfig, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
c := &BlocksCleaner{
cfg: cfg,
bucketClient: bucketClient,
bucketCfg: bktCfg,
usersScanner: usersScanner,
cfgProvider: cfgProvider,
logger: log.With(logger, "component", "cleaner"),
Expand Down Expand Up @@ -329,7 +331,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}

// Generate an updated in-memory version of the bucket index.
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.cfg.DeletionDelay, c.cfg.CleanupInterval, c.bucketCfg, c.logger)
idx, partials, err := w.UpdateIndex(ctx, idx)
if err != nil {
return err
Expand Down
67 changes: 49 additions & 18 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -98,6 +99,16 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json")
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

// create sample tombstones
tombstone1 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending)
tombstone2 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending)
tombstone3 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series1"}, "request3", cortex_tsdb.StatePending)
tombstone4 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series2"}, "request4", cortex_tsdb.StateCancelled)
uploadTombstone(t, bucketClient, "user-1", tombstone1)
uploadTombstone(t, bucketClient, "user-1", tombstone2)
uploadTombstone(t, bucketClient, "user-2", tombstone3)
uploadTombstone(t, bucketClient, "user-2", tombstone4)

// The fixtures have been created. If the bucket client wasn't wrapped to write
// deletion marks to the global location too, then this is the right time to do it.
if options.markersMigrationEnabled {
Expand All @@ -117,7 +128,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -166,21 +177,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions

// Check the updated bucket index.
for _, tc := range []struct {
userID string
expectedIndex bool
expectedBlocks []ulid.ULID
expectedMarks []ulid.ULID
userID string
expectedIndex bool
expectedBlocks []ulid.ULID
expectedMarks []ulid.ULID
expectedTombstonesIDs []string
}{
{
userID: "user-1",
expectedIndex: true,
expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */},
expectedMarks: []ulid.ULID{block2},
userID: "user-1",
expectedIndex: true,
expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */},
expectedMarks: []ulid.ULID{block2},
expectedTombstonesIDs: []string{"request1", "request2"},
}, {
userID: "user-2",
expectedIndex: true,
expectedBlocks: []ulid.ULID{block8},
expectedMarks: []ulid.ULID{},
userID: "user-2",
expectedIndex: true,
expectedBlocks: []ulid.ULID{block8},
expectedMarks: []ulid.ULID{},
expectedTombstonesIDs: []string{"request3"},
// request4 should not be included because it is
// cancelled and should not be used for query filtering
}, {
userID: "user-3",
expectedIndex: false,
Expand All @@ -195,6 +211,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
assert.ElementsMatch(t, tc.expectedTombstonesIDs, idx.Tombstones.GetRequestIDs())
}

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -251,7 +268,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -297,6 +314,10 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
block3 := createTSDBBlock(t, bucketClient, userID, 30, 40, nil)
createDeletionMark(t, bucketClient, userID, block2, now.Add(-deletionDelay).Add(-time.Hour))
createDeletionMark(t, bucketClient, userID, block3, now.Add(-deletionDelay).Add(time.Hour))
tombstone1 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending)
tombstone2 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending)
uploadTombstone(t, bucketClient, userID, tombstone1)
uploadTombstone(t, bucketClient, userID, tombstone2)

// Write a corrupted bucket index.
require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}")))
Expand All @@ -311,7 +332,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -339,6 +360,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, err)
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
assert.ElementsMatch(t, []string{"request1", "request2"}, idx.Tombstones.GetRequestIDs())
}

func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {
Expand All @@ -362,7 +384,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg)
require.NoError(t, cleaner.cleanUsers(ctx, true))

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -420,7 +442,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
id2 := createTSDBBlock(t, bucketClient, "user-1", 6000, 7000, nil)
id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil)

w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger)
w := bucketindex.NewUpdater(bucketClient, "user-1", nil, 0, 0, newMockBucketStoreCfg(), logger)
idx, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -493,7 +515,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down Expand Up @@ -681,3 +703,12 @@ func (m *mockConfigProvider) S3SSEKMSKeyID(userID string) string {
func (m *mockConfigProvider) S3SSEKMSEncryptionContext(userID string) string {
return ""
}

func newMockBucketStoreCfg() cortex_tsdb.BucketStoreConfig {
return cortex_tsdb.BucketStoreConfig{
SyncInterval: time.Minute,
BucketIndex: cortex_tsdb.BucketIndexConfig{
MaxStalePeriod: time.Hour,
},
}
}
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (c *Compactor) starting(ctx context.Context) error {
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
}, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer)
}, c.bucketClient, c.usersScanner, c.cfgProvider, c.storageCfg.BucketStore, c.parentLogger, c.registerer)

// Initialize the compactors ring if sharding is enabled.
if c.compactorCfg.ShardingEnabled {
Expand Down
17 changes: 17 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockIter(userID+"/tombstones/", nil, nil)
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
Expand Down Expand Up @@ -476,6 +477,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockIter("user-1/tombstones/", nil, nil)
bucketClient.MockIter("user-2/tombstones/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)

Expand Down Expand Up @@ -605,6 +608,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockIter("user-1/tombstones/", nil, nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient)

Expand Down Expand Up @@ -713,6 +717,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
bucketClient.MockIter("user-1/tombstones/", nil, nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient)

Expand Down Expand Up @@ -808,6 +813,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockIter("user-1/tombstones/", nil, nil)
bucketClient.MockIter("user-2/tombstones/", nil, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
Expand Down Expand Up @@ -886,6 +893,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
for _, userID := range userIDs {
bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockIter(userID+"/tombstones/", nil, nil)
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
Expand Down Expand Up @@ -1031,6 +1039,15 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func uploadTombstone(t *testing.T, bkt objstore.Bucket, userID string, tombstone *cortex_tsdb.Tombstone) {
tombstoneFilename := tombstone.GetFilename()
path := path.Join(userID, cortex_tsdb.TombstonePath, tombstoneFilename)
data, err := json.Marshal(tombstone)

require.NoError(t, err)
require.NoError(t, bkt.Upload(context.Background(), path, bytes.NewReader(data)))
}

func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
var compactor *Compactor
var log *concurrency.SyncBuffer
Expand Down
72 changes: 72 additions & 0 deletions pkg/querier/blocks_finder_bucket_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package querier

import (
"context"
"math"
"time"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -107,3 +112,70 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,

return blocks, matchingDeletionMarks, nil
}

func (f *BucketIndexBlocksFinder) GetTombstones(ctx context.Context, userID string, minT int64, maxT int64) (*purger.TombstonesSet, error) {
if f.State() != services.Running {
return nil, errBucketIndexBlocksFinderNotRunning
}
if maxT < minT {
return nil, errInvalidBlocksRange
}

// Get the bucket index for this user.
idx, err := f.loader.GetIndex(ctx, userID)
if errors.Is(err, bucketindex.ErrIndexNotFound) {
// This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
// so the bucket index hasn't been created yet.
return nil, nil
}
if err != nil {
return nil, err
}

// Ensure the bucket index is not too old.
if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod {
return nil, errBucketIndexTooOld
}

tConverted := []purger.DeleteRequest{}
var tMinTime int64 = math.MaxInt64
var tMaxTime int64 = math.MinInt64
for _, t := range idx.Tombstones {
if !t.IsOverlappingInterval(minT, maxT) {
continue
}

// Convert the tombstone into a deletion request which was implemented for chunk store deletion
// This will allow many of the query filtering code to be shared among block/chunk store
matchers, err := cortex_tsdb.ParseMatchers(t.Selectors)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse tombstone selectors for: %s", t.RequestID)
}

request := purger.DeleteRequest{
StartTime: model.Time(t.StartTime),
EndTime: model.Time(t.EndTime),
Matchers: [][]*labels.Matcher{matchers},
}
tConverted = append(tConverted, request)

if t.StartTime < tMinTime {
tMinTime = t.StartTime
}
if t.EndTime > tMaxTime {
tMaxTime = t.EndTime
}
}

// Reduce the interval that tombstone will be applied if possible
if minT > tMinTime {
tMinTime = minT
}
if maxT < tMaxTime {
tMaxTime = maxT
}
tombstoneSet := purger.NewTombstoneSet(tConverted, model.Time(tMinTime), model.Time(tMaxTime))

return tombstoneSet, nil

}
Loading