Skip to content

Commit

Permalink
introduce tombstone cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 1, 2024
1 parent ae5e255 commit 050fcd3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 38 deletions.
29 changes: 22 additions & 7 deletions pkg/experiment/metastore/compaction_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

type CompactionJobScheduler interface {
type CompactionScheduler interface {
UpdateJobStatus(tx *bbolt.Tx, cmd *raft.Log, status *metastorev1.CompactionJobStatus) error
AssignJobs(tx *bbolt.Tx, cmd *raft.Log, max uint32) ([]*metastorev1.CompactionJob, error)
AssignJob(tx *bbolt.Tx, cmd *raft.Log) (*metastorev1.CompactionJob, error)
}

type TombstoneCleaner interface {
GetTombstones(tx *bbolt.Tx, cmd *raft.Log, size uint32) ([]string, error)
DeleteTombstones(tx *bbolt.Tx, tombstones []string) error
}

type PollCompactionJobsRequestHandler struct {
logger log.Logger
scheduler CompactionJobScheduler
scheduler CompactionScheduler
cleaner TombstoneCleaner
}

func (m *PollCompactionJobsRequestHandler) Apply(
Expand All @@ -30,11 +36,20 @@ func (m *PollCompactionJobsRequestHandler) Apply(
if status.Status == metastorev1.CompactionStatus_COMPACTION_STATUS_CANCELLED {
resp.CancelledJobs = append(resp.CancelledJobs, status.JobName)
}
if err := m.cleaner.DeleteTombstones(tx, status.DeletedBlocks); err != nil {
return nil, err
}
}
var err error
resp.CompactionJobs, err = m.scheduler.AssignJobs(tx, cmd, req.JobCapacity)
if err != nil {
return nil, err
for len(resp.CompactionJobs) < int(req.JobCapacity) {
job, err := m.scheduler.AssignJob(tx, cmd)
if err != nil {
return nil, err
}
job.Tombstones, err = m.cleaner.GetTombstones(tx, cmd, req.CleanupCapacity)
if err != nil {
return nil, err
}
resp.CompactionJobs = append(resp.CompactionJobs, job)
}
return &resp, nil
}
51 changes: 20 additions & 31 deletions pkg/experiment/metastore/compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ type PlannerIndex interface {
InserterIndex
FindBlock(shard uint32, tenant, block string) *metastorev1.BlockMeta
ReplaceBlocks(tx *bbolt.Tx, shard uint32, tenant string, new []*metastorev1.BlockMeta, old []string) error
GetTombstones(tx *bbolt.Tx, cmd *raft.Log, shard uint32, tenant string) ([]string, error)
DeleteTombstones(tx *bbolt.Tx, tombstones []string) error
}

// TODO: Implement state loader for the compaction planner.

var (
_ Compactor = (*CompactionPlanner)(nil)
_ CompactionScheduler = (*CompactionPlanner)(nil)
)

type CompactionPlanner struct {
logger log.Logger
config CompactionConfig
Expand Down Expand Up @@ -145,12 +148,6 @@ func (c *CompactionPlanner) UpdateJobStatus(tx *bbolt.Tx, cmd *raft.Log, s *meta
}

level.Debug(c.logger).Log("msg", "processing status update for compaction job", "job", s.JobName, "status", s.Status)
if len(s.DeletedBlocks) > 0 {
if err := c.index.DeleteTombstones(tx, s.DeletedBlocks); err != nil {
return err
}
}

switch s.Status {
case metastorev1.CompactionStatus_COMPACTION_STATUS_SUCCESS:
return c.handleStatusSuccess(tx, cmd, s, job)
Expand Down Expand Up @@ -214,30 +211,22 @@ func (c *CompactionPlanner) handleStatusFailure(
return c.persistJob(tx, status.JobName)
}

func (c *CompactionPlanner) AssignJobs(tx *bbolt.Tx, cmd *raft.Log, max uint32) ([]*metastorev1.CompactionJob, error) {
jobs := make([]*metastorev1.CompactionJob, 0, max)
for len(jobs) < int(max) {
stored := c.queue.dequeue(cmd.AppendedAt.UnixNano(), cmd.Index)
if stored == nil {
// No more jobs to assign.
return nil, nil
}
job, err := c.convertJob(tx, stored)
if err != nil {
return nil, err
}
if err = c.persistJob(tx, job.Name); err != nil {
return nil, err
}
job.Tombstones, err = c.index.GetTombstones(tx, cmd, job.Shard, job.TenantId)
if err != nil {
return nil, err
}
level.Debug(c.logger).Log("msg", "job assigned", "job", job.Name, "raft_log_index", cmd.Index)
c.metrics.assignedJobs.WithLabelValues(compactionMetricDimsJob(stored)...).Inc()
jobs = append(jobs, job)
func (c *CompactionPlanner) AssignJob(tx *bbolt.Tx, cmd *raft.Log) (*metastorev1.CompactionJob, error) {
stored := c.queue.dequeue(cmd.AppendedAt.UnixNano(), cmd.Index)
if stored == nil {
// No more jobs to assign.
return nil, nil
}
if err := c.persistJob(tx, stored.Name); err != nil {
return nil, err
}
job, err := c.convertJob(tx, stored)
if err != nil {
return nil, err
}
return jobs, nil
level.Debug(c.logger).Log("msg", "job assigned", "job", job.Name, "raft_log_index", cmd.Index)
c.metrics.assignedJobs.WithLabelValues(compactionMetricDimsJob(stored)...).Inc()
return job, nil
}

func (c *CompactionPlanner) convertJob(tx *bbolt.Tx, job *compactionpb.StoredCompactionJob) (*metastorev1.CompactionJob, error) {
Expand Down

0 comments on commit 050fcd3

Please sign in to comment.