Skip to content

Commit

Permalink
refactor(blooms): Add metrics for per-tenant tasks progress to planner (
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jun 20, 2024
1 parent 1d6f8d5 commit 9289493
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
16 changes: 15 additions & 1 deletion pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ type Metrics struct {
blocksDeleted prometheus.Counter
metasDeleted prometheus.Counter

tenantsDiscovered prometheus.Counter
tenantsDiscovered prometheus.Counter
tenantTasksPlanned *prometheus.GaugeVec
tenantTasksCompleted *prometheus.GaugeVec
}

func NewMetrics(
Expand Down Expand Up @@ -129,6 +131,18 @@ func NewMetrics(
Name: "tenants_discovered_total",
Help: "Number of tenants discovered during the current build iteration",
}),
tenantTasksPlanned: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenant_tasks_planned",
Help: "Number of tasks planned for a tenant during the current build iteration.",
}, []string{"tenant"}),
tenantTasksCompleted: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant"}),
}
}

Expand Down
26 changes: 23 additions & 3 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func (p *Planner) stopping(_ error) error {
}

func (p *Planner) running(ctx context.Context) error {
go p.trackInflightRequests(ctx)

// run once at beginning
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
Expand All @@ -130,9 +132,6 @@ func (p *Planner) running(ctx context.Context) error {
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()

inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()

for {
select {
case <-ctx.Done():
Expand All @@ -149,6 +148,19 @@ func (p *Planner) running(ctx context.Context) error {
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
}
}
}
}

func (p *Planner) trackInflightRequests(ctx context.Context) {
inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()

for {
select {
case <-ctx.Done():
// We just return. Error handling and logging is done in the main loop (running method).
return

case <-inflightTasksTicker.C:
inflight := p.totalPendingTasks()
Expand Down Expand Up @@ -223,6 +235,7 @@ func (p *Planner) runOne(ctx context.Context) error {
tenantTableEnqueuedTasks++
}

p.metrics.tenantTasksPlanned.WithLabelValues(tt.tenant).Add(float64(tenantTableEnqueuedTasks))
tasksResultForTenantTable[tt] = tenantTableTaskResults{
tasksToWait: tenantTableEnqueuedTasks,
originalMetas: existingMetas,
Expand Down Expand Up @@ -489,6 +502,12 @@ func (p *Planner) loadTenantWork(

tenantTableWork[table][tenant] = bounds

// Reset progress tracking metrics for this tenant
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant).Set(0)

level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
}
if err := tenants.Err(); err != nil {
Expand Down Expand Up @@ -804,6 +823,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"retries", task.timesEnqueued.Load(),
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc()

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down

0 comments on commit 9289493

Please sign in to comment.