Skip to content

Commit

Permalink
refactor(blooms): Extract planner queue into pkg (backport k227) (#14925
Browse files Browse the repository at this point in the history
)

Co-authored-by: Salva Corts <salva.corts@grafana.com>
  • Loading branch information
loki-gh-app[bot] and salvacorts authored Nov 14, 2024
1 parent dc36a1e commit 98ad8c8
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 76 deletions.
9 changes: 5 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1277,15 +1277,16 @@ planner:
# CLI flag: -bloom-build.planner.max-table-offset
[max_table_offset: <int> | default = 2]

# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

retention:
# Enable bloom retention.
# CLI flag: -bloom-build.planner.retention.enabled
[enabled: <boolean> | default = false]

queue:
# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

builder:
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
17 changes: 11 additions & 6 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"fmt"
"time"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)

// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
RetentionConfig RetentionConfig `yaml:"retention"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
RetentionConfig RetentionConfig `yaml:"retention"`
Queue queue.Config `yaml:"queue"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
Expand All @@ -28,8 +29,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// dynamically reloaded.
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f)
cfg.Queue.RegisterFlagsWithPrefix(prefix+".queue", f)
}

func (cfg *Config) Validate() error {
Expand All @@ -41,6 +42,10 @@ func (cfg *Config) Validate() error {
return err
}

if err := cfg.Queue.Validate(); err != nil {
return err
}

return nil
}

Expand Down
84 changes: 30 additions & 54 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
Expand All @@ -50,10 +49,7 @@ type Planner struct {
tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase

tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

pendingTasks sync.Map
tasksQueue *queue.Queue

metrics *Metrics
logger log.Logger
Expand Down Expand Up @@ -83,23 +79,21 @@ func New(

// Queue to manage tasks
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
queueMetrics.Cleanup(user)
})
queueLimits := NewQueueLimits(limits)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
if err != nil {
return nil, fmt.Errorf("error creating tasks queue: %w", err)
}

p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
activeUsers: activeUsers,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}

p.retentionManager = NewRetentionManager(
Expand All @@ -110,7 +104,7 @@ func New(
p.logger,
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}
svcs := []services.Service{p.tasksQueue}

if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
Expand Down Expand Up @@ -204,7 +198,7 @@ func (p *Planner) trackInflightRequests(ctx context.Context) {
return

case <-inflightTasksTicker.C:
inflight := p.totalPendingTasks()
inflight := p.tasksQueue.TotalPending()
p.metrics.inflightRequests.Observe(float64(inflight))
}
}
Expand Down Expand Up @@ -708,27 +702,9 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}

func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}

func (p *Planner) removePendingTask(task *QueueTask) {
p.pendingTasks.Delete(task.ID)
}

func (p *Planner) totalPendingTasks() (total int) {
p.pendingTasks.Range(func(_, _ interface{}) bool {
total++
return true
})
return total
}

func (p *Planner) enqueueTask(task *QueueTask) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
task.timesEnqueued.Add(1)
p.addPendingTask(task)
})
}

Expand Down Expand Up @@ -773,35 +749,35 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
lastIndex = idx

if item == nil {

return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}

task := item.(*QueueTask)
logger := log.With(logger, "task", task.ID)
logger := log.With(logger, "task", task.ID())

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.removePendingTask(task)
p.tasksQueue.Release(task)
continue
}

result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
}
continue
Expand All @@ -810,10 +786,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("error re-enqueuing task: %w", err),
}
continue
Expand All @@ -833,7 +809,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.tasksQueue.Release(task)

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down Expand Up @@ -870,7 +846,7 @@ func (p *Planner) forwardTaskToBuilder(
}()

timeout := make(<-chan time.Time)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
if taskTimeout != 0 {
// If the timeout is not 0 (disabled), configure it
timeout = time.After(taskTimeout)
Expand Down Expand Up @@ -910,8 +886,8 @@ func (p *Planner) receiveResultFromBuilder(
if err != nil {
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
}
if result.TaskID != task.ID {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID)
if result.TaskID != task.ID() {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
}

return result, nil
Expand Down
31 changes: 19 additions & 12 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
Expand Down Expand Up @@ -163,8 +164,10 @@ func Test_BuilderLoop(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, tc.limits, logger)

Expand Down Expand Up @@ -206,7 +209,7 @@ func Test_BuilderLoop(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// Finally, the queue should be empty
require.Equal(t, 0, planner.totalPendingTasks())
require.Equal(t, 0, planner.tasksQueue.TotalPending())

// consume all tasks result to free up the channel for the next round of tasks
for i := 0; i < nTasks; i++ {
Expand All @@ -228,15 +231,15 @@ func Test_BuilderLoop(t *testing.T) {
if tc.shouldConsumeAfterModify {
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
} else {
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
Expand All @@ -254,10 +257,10 @@ func Test_BuilderLoop(t *testing.T) {
// Now all tasks should be consumed
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
}
})
Expand Down Expand Up @@ -384,8 +387,10 @@ func Test_processTenantTaskResults(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

Expand Down Expand Up @@ -544,8 +549,10 @@ func Test_deleteOutdatedMetas(t *testing.T) {
// logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

Expand Down
22 changes: 22 additions & 0 deletions pkg/bloombuild/planner/queue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package queue

import (
"flag"

"github.com/grafana/loki/v3/pkg/queue"
)

type Config struct {
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
}

func (cfg *Config) Validate() error {
return nil
}

type Limits = queue.Limits
Loading

0 comments on commit 98ad8c8

Please sign in to comment.