diff --git a/pkg/ingester/retention.go b/pkg/ingester/retention.go index bdb12baefd..3427a158fb 100644 --- a/pkg/ingester/retention.go +++ b/pkg/ingester/retention.go @@ -153,7 +153,7 @@ func (dc *diskCleaner) DeleteUploadedBlocks(ctx context.Context) int { } for _, block := range blocks { - if !dc.isBlockDeletable(block) { + if !block.Uploaded || !dc.isExpired(block) { continue } @@ -233,7 +233,7 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context) prevVolumeStats := &diskutil.VolumeStats{} filesDeleted := 0 for _, block := range blocks { - if !dc.isBlockDeletable(block) { + if !dc.isExpired(block) { continue } @@ -289,12 +289,12 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context) } // isBlockDeletable returns true if this block can be deleted. -func (dc *diskCleaner) isBlockDeletable(block *tenantBlock) bool { +func (dc *diskCleaner) isExpired(block *tenantBlock) bool { // TODO(kolesnikovae): // Expiry defaults to -querier.query-store-after which should be deprecated, // blocks-storage.bucket-store.ignore-blocks-within can be used instead. expiryTs := time.Now().Add(-dc.policy.Expiry) - return block.Uploaded && ulid.Time(block.ID.Time()).Before(expiryTs) + return ulid.Time(block.ID.Time()).Before(expiryTs) } // blocksByUploadAndAge implements sorting tenantBlock by uploaded then by age @@ -359,6 +359,32 @@ type realFSBlockManager struct { FS fileSystem } +func (bm *realFSBlockManager) getUploadedBlockIds(tenantID string) (map[ulid.ULID]struct{}, error) { + localDirPath := filepath.Join(bm.Root, tenantID, phlareDBLocalPath) + + shipperPath := filepath.Join(localDirPath, shipper.MetaFilename) + bytes, err := fs.ReadFile(bm.FS, shipperPath) + if err != nil { + if os.IsNotExist(err) { + return make(map[ulid.ULID]struct{}), nil + } + return nil, err + } + + var meta shipper.Meta + err = json.Unmarshal(bytes, &meta) + if err != nil { + return nil, err + } + + uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) + for _, id := range meta.Uploaded { + uploadedBlockIDs[id] = struct{}{} + } + + return uploadedBlockIDs, nil +} + func (bm *realFSBlockManager) GetTenantIDs(ctx context.Context) ([]string, error) { if ctx.Err() != nil { return nil, ctx.Err() @@ -391,23 +417,11 @@ func (bm *realFSBlockManager) GetBlocksForTenant(ctx context.Context, tenantID s return nil, err } - shipperPath := filepath.Join(localDirPath, shipper.MetaFilename) - bytes, err := fs.ReadFile(bm.FS, shipperPath) + uploadedBlockIDs, err := bm.getUploadedBlockIds(tenantID) if err != nil { return nil, err } - var meta shipper.Meta - err = json.Unmarshal(bytes, &meta) - if err != nil { - return nil, err - } - - uploadedBlockIDs := make(map[ulid.ULID]struct{}, len(meta.Uploaded)) - for _, id := range meta.Uploaded { - uploadedBlockIDs[id] = struct{}{} - } - // Read blocks. blocks := make([]*tenantBlock, 0) for _, blockDir := range blockDirs { diff --git a/pkg/ingester/retention_test.go b/pkg/ingester/retention_test.go index 84440a2cf7..53090feae8 100644 --- a/pkg/ingester/retention_test.go +++ b/pkg/ingester/retention_test.go @@ -252,7 +252,14 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) { BytesAvailable: 100, BytesTotal: 200, }, nil). - Once() // Expect the loop to break after a single block delete (since the subsequent blocks aren't uploaded). + Once() + vc.On("HasHighDiskUtilization", mock.Anything). + Return(&diskutil.VolumeStats{ + HighDiskUtilization: false, + BytesAvailable: 100, + BytesTotal: 200, + }, nil). + Once() dc := newDiskCleaner(log.NewNopLogger(), e, defaultRetentionPolicy(), phlaredb.Config{ DataPath: "./data", @@ -261,7 +268,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) { dc.volumeChecker = vc deleted, bytesFreed, hadHighDisk := dc.CleanupBlocksWhenHighDiskUtilization(context.Background()) - require.Equal(t, 1, deleted) + require.Equal(t, 2, deleted) require.Equal(t, 100, bytesFreed) require.True(t, hadHighDisk) }) @@ -316,7 +323,7 @@ func TestDiskCleaner_EnforceHighDiskUtilization(t *testing.T) { }) } -func TestDiskCleaner_isBlockDeletable(t *testing.T) { +func TestDiskCleaner_isBlockDeletableForUploadedBlocks(t *testing.T) { tests := []struct { Name string Expiry time.Duration @@ -369,7 +376,7 @@ func TestDiskCleaner_isBlockDeletable(t *testing.T) { t.Run(tt.Name, func(t *testing.T) { dc.policy.Expiry = tt.Expiry - got := dc.isBlockDeletable(tt.Block) + got := tt.Block.Uploaded && dc.isExpired(tt.Block) require.Equal(t, tt.Want, got) }) }