Skip to content

Commit

Permalink
feat(blooms): disk-backed queue for the bloom-planner (backport k227) (
Browse files Browse the repository at this point in the history
…#14927)

Co-authored-by: Salva Corts <salva.corts@grafana.com>
  • Loading branch information
loki-gh-app[bot] and salvacorts authored Nov 14, 2024
1 parent 98ad8c8 commit 1f6828b
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 67 deletions.
12 changes: 12 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,18 @@ planner:
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

# Whether to store tasks on disk.
# CLI flag: -bloom-build.planner.queue.store-tasks-on-disk
[store_tasks_on_disk: <boolean> | default = false]

# Directory to store tasks on disk.
# CLI flag: -bloom-build.planner.queue.tasks-disk-directory
[tasks_disk_directory: <string> | default = "/tmp/bloom-planner-queue"]

# Whether to clean the tasks directory on startup.
# CLI flag: -bloom-build.planner.queue.clean-tasks-directory
[clean_tasks_directory: <boolean> | default = false]

builder:
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
41 changes: 23 additions & 18 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func New(
// Queue to manage tasks
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
queueLimits := NewQueueLimits(limits)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics, storageMetrics)
if err != nil {
return nil, fmt.Errorf("error creating tasks queue: %w", err)
}
Expand Down Expand Up @@ -280,7 +280,8 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
protoTask := task.ToProtoTask()
queueTask := NewQueueTask(ctx, now, protoTask, resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -703,7 +704,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
}

func (p *Planner) enqueueTask(task *QueueTask) error {
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
return p.tasksQueue.Enqueue(task.ProtoTask, task.TaskMeta, func() {
task.timesEnqueued.Add(1)
})
}
Expand Down Expand Up @@ -738,7 +739,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

lastIndex := queue.StartIndex
for p.isRunningOrStopping() {
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
protoTask, meta, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
if err != nil {
if errors.Is(err, queue.ErrStopped) {
// Planner is stopping, break the loop and return
Expand All @@ -748,36 +749,40 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
}
lastIndex = idx

if item == nil {
if protoTask == nil {
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}

task := item.(*QueueTask)
logger := log.With(logger, "task", task.ID())
task := &QueueTask{
ProtoTask: protoTask,
TaskMeta: meta.(*TaskMeta),
}

logger := log.With(logger, "task", task.Id)

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
continue
}

result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID(),
TaskID: task.Id,
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
}
continue
Expand All @@ -786,10 +791,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID(),
TaskID: task.Id,
Error: fmt.Errorf("error re-enqueuing task: %w", err),
}
continue
Expand All @@ -809,7 +814,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.tasksQueue.Release(task)
p.tasksQueue.Release(task.ProtoTask)

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand All @@ -824,7 +829,7 @@ func (p *Planner) forwardTaskToBuilder(
task *QueueTask,
) (*protos.TaskResult, error) {
msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: task.ProtoTask,
}

if err := builder.Send(msg); err != nil {
Expand All @@ -846,7 +851,7 @@ func (p *Planner) forwardTaskToBuilder(
}()

timeout := make(<-chan time.Time)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
if taskTimeout != 0 {
// If the timeout is not 0 (disabled), configure it
timeout = time.After(taskTimeout)
Expand Down Expand Up @@ -886,8 +891,8 @@ func (p *Planner) receiveResultFromBuilder(
if err != nil {
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
}
if result.TaskID != task.ID() {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
if result.TaskID != task.Id {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.Id)
}

return result, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
Expand Down Expand Up @@ -725,7 +726,7 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)), plannertest.TsdbID(1), nil).ToProtoTask(),
resultsCh,
)
tasks = append(tasks, task)
Expand Down
13 changes: 12 additions & 1 deletion pkg/bloombuild/planner/queue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,31 @@ package queue

import (
"flag"
"fmt"

"github.com/grafana/loki/v3/pkg/queue"
)

type Config struct {
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
StoreTasksOnDisk bool `yaml:"store_tasks_on_disk"`
TasksDiskDirectory string `yaml:"tasks_disk_directory"`
CleanTasksDirectory bool `yaml:"clean_tasks_directory"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
f.BoolVar(&cfg.StoreTasksOnDisk, prefix+".store-tasks-on-disk", false, "Whether to store tasks on disk.")
f.StringVar(&cfg.TasksDiskDirectory, prefix+".tasks-disk-directory", "/tmp/bloom-planner-queue", "Directory to store tasks on disk.")
f.BoolVar(&cfg.CleanTasksDirectory, prefix+".clean-tasks-directory", false, "Whether to clean the tasks directory on startup.")
}

func (cfg *Config) Validate() error {
if cfg.StoreTasksOnDisk && cfg.TasksDiskDirectory == "" {
return fmt.Errorf("tasks_disk_directory must be set when store_tasks_on_disk is true")
}

return nil
}

Expand Down
Loading

0 comments on commit 1f6828b

Please sign in to comment.