Skip to content

Commit

Permalink
fix: compaction block queue persistence (#3532)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p authored Aug 30, 2024
1 parent 102e6b4 commit 60c0b09
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ
}
m.compactionMetrics.addedBlocks.WithLabelValues(
fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc()
stateUpdate.updatedBlockQueues[jobKey] = append(stateUpdate.updatedBlockQueues[jobKey], b.CompactionLevel)
blockTenantShard := tenantShard{tenant: b.TenantId, shard: b.Shard}
stateUpdate.updatedBlockQueues[blockTenantShard] = append(stateUpdate.updatedBlockQueues[blockTenantShard], b.CompactionLevel)
}
// finally we'll delete the metadata for source blocks (this doesn't delete blocks from object store)
for _, b := range job.Blocks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func Test_CompactedBlockCanCreateNewJob(t *testing.T) {
addLevel0Blocks(m, 20)

// add 9 level 1 blocks so that we can create a job once a new level 1 block gets added (we need 10 blocks for level 1)
addBlocksWithLevel(m, 9, 1)
addLevel1Blocks(m, "t1", 9)

// assign the job to a worker
resp, err := m.pollCompactionJobs(&compactorv1.PollCompactionJobsRequest{JobCapacity: 1}, 20, 20)
Expand All @@ -205,11 +205,13 @@ func Test_CompactedBlockCanCreateNewJob(t *testing.T) {
{
Id: "b-20-1",
Shard: uint32(0),
TenantId: "t1",
CompactionLevel: uint32(1),
},
{
Id: "b-21-1",
Shard: uint32(0),
TenantId: "t1",
CompactionLevel: uint32(1),
},
},
Expand All @@ -229,7 +231,7 @@ func Test_CompactedBlockCanCreateNewJob(t *testing.T) {

// the second compacted block from the status update should be added to the block queue
key := tenantShard{
tenant: "",
tenant: "t1",
shard: 0,
}
require.Equalf(t, 1, len(m.compactionJobBlockQueues[key].blocksByLevel[1]), "there should be one level-1 block in the queue")
Expand Down Expand Up @@ -280,10 +282,10 @@ func addLevel0Blocks(m *metastoreState, count int) {
}
}

func addBlocksWithLevel(m *metastoreState, count int, level int) {
func addLevel1Blocks(m *metastoreState, tenant string, count int) {
for i := 0; i < count; i++ {
b := createBlock(i, 0, "", level)
b.Id = fmt.Sprintf("b-%d-%d", i, level)
b := createBlock(i, 0, tenant, 1)
b.Id = fmt.Sprintf("b-%d-%d", i, 1)
raftLog := &raft.Log{
Index: uint64(i),
AppendedAt: time.Unix(0, int64(i)),
Expand Down

0 comments on commit 60c0b09

Please sign in to comment.