Skip to content

Commit

Permalink
Stop Disk cleaner when storage bucket is not defined (grafana#3135)
Browse files Browse the repository at this point in the history
* Stop Bucket cleaner when storage bucket is not defined

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Delete storage bucket when high disk utilisation

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Create new abstraction for getting uploaded block

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Fix retention

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Update blockdeletable check for both cleanuphighDiskUtilisation as well as deleteUploadedBlock

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Fix Test for Retention

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Fix Test for Retention

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

* Fix Pkg Ingester

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>

---------

Signed-off-by: Drumil Patel <drumilpatel720@gmail.com>
  • Loading branch information
weastel authored Apr 5, 2024
1 parent cf9b77d commit 2383667
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 21 deletions.
48 changes: 31 additions & 17 deletions pkg/ingester/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (dc *diskCleaner) DeleteUploadedBlocks(ctx context.Context) int {
}

for _, block := range blocks {
if !dc.isBlockDeletable(block) {
if !block.Uploaded || !dc.isExpired(block) {
continue
}

Expand Down Expand Up @@ -233,7 +233,7 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context)
prevVolumeStats := &diskutil.VolumeStats{}
filesDeleted := 0
for _, block := range blocks {
if !dc.isBlockDeletable(block) {
if !dc.isExpired(block) {
continue
}

Expand Down Expand Up @@ -289,12 +289,12 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context)
}

// isBlockDeletable returns true if this block can be deleted.
func (dc *diskCleaner) isBlockDeletable(block *tenantBlock) bool {
func (dc *diskCleaner) isExpired(block *tenantBlock) bool {
// TODO(kolesnikovae):
// Expiry defaults to -querier.query-store-after which should be deprecated,
// blocks-storage.bucket-store.ignore-blocks-within can be used instead.
expiryTs := time.Now().Add(-dc.policy.Expiry)
return block.Uploaded && ulid.Time(block.ID.Time()).Before(expiryTs)
return ulid.Time(block.ID.Time()).Before(expiryTs)
}

// blocksByUploadAndAge implements sorting tenantBlock by uploaded then by age
Expand Down Expand Up @@ -359,6 +359,32 @@ type realFSBlockManager struct {
FS fileSystem
}

func (bm *realFSBlockManager) getUploadedBlockIds(tenantID string) (map[ulid.ULID]struct{}, error) {
localDirPath := filepath.Join(bm.Root, tenantID, phlareDBLocalPath)

shipperPath := filepath.Join(localDirPath, shipper.MetaFilename)
bytes, err := fs.ReadFile(bm.FS, shipperPath)
if err != nil {
if os.IsNotExist(err) {
return make(map[ulid.ULID]struct{}), nil
}
return nil, err
}

var meta shipper.Meta
err = json.Unmarshal(bytes, &meta)
if err != nil {
return nil, err
}

uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {
uploadedBlockIDs[id] = struct{}{}
}

return uploadedBlockIDs, nil
}

func (bm *realFSBlockManager) GetTenantIDs(ctx context.Context) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand Down Expand Up @@ -391,23 +417,11 @@ func (bm *realFSBlockManager) GetBlocksForTenant(ctx context.Context, tenantID s
return nil, err
}

shipperPath := filepath.Join(localDirPath, shipper.MetaFilename)
bytes, err := fs.ReadFile(bm.FS, shipperPath)
uploadedBlockIDs, err := bm.getUploadedBlockIds(tenantID)
if err != nil {
return nil, err
}

var meta shipper.Meta
err = json.Unmarshal(bytes, &meta)
if err != nil {
return nil, err
}

uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {
uploadedBlockIDs[id] = struct{}{}
}

// Read blocks.
blocks := make([]*tenantBlock, 0)
for _, blockDir := range blockDirs {
Expand Down
15 changes: 11 additions & 4 deletions pkg/ingester/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,14 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
BytesAvailable: 100,
BytesTotal: 200,
}, nil).
Once() // Expect the loop to break after a single block delete (since the subsequent blocks aren't uploaded).
Once()
vc.On("HasHighDiskUtilization", mock.Anything).
Return(&diskutil.VolumeStats{
HighDiskUtilization: false,
BytesAvailable: 100,
BytesTotal: 200,
}, nil).
Once()

dc := newDiskCleaner(log.NewNopLogger(), e, defaultRetentionPolicy(), phlaredb.Config{
DataPath: "./data",
Expand All @@ -261,7 +268,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
dc.volumeChecker = vc

deleted, bytesFreed, hadHighDisk := dc.CleanupBlocksWhenHighDiskUtilization(context.Background())
require.Equal(t, 1, deleted)
require.Equal(t, 2, deleted)
require.Equal(t, 100, bytesFreed)
require.True(t, hadHighDisk)
})
Expand Down Expand Up @@ -316,7 +323,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) {
})
}

func TestDiskCleaner_isBlockDeletable(t *testing.T) {
func TestDiskCleaner_isBlockDeletableForUploadedBlocks(t *testing.T) {
tests := []struct {
Name string
Expiry time.Duration
Expand Down Expand Up @@ -369,7 +376,7 @@ func TestDiskCleaner_isBlockDeletable(t *testing.T) {
t.Run(tt.Name, func(t *testing.T) {
dc.policy.Expiry = tt.Expiry

got := dc.isBlockDeletable(tt.Block)
got := tt.Block.Uploaded && dc.isExpired(tt.Block)
require.Equal(t, tt.Want, got)
})
}
Expand Down

0 comments on commit 2383667

Please sign in to comment.