Skip to content

Commit b646861

Browse files
salvacortschaudum
andauthored
feat(blooms): disk-backed queue for the bloom-planner (#14874)
Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
1 parent a4e33e0 commit b646861

File tree

8 files changed

+445
-67
lines changed

8 files changed

+445
-67
lines changed

docs/sources/shared/configuration.md

+12
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,18 @@ planner:
12331233
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
12341234
[max_queued_tasks_per_tenant: <int> | default = 30000]
12351235

1236+
# Whether to store tasks on disk.
1237+
# CLI flag: -bloom-build.planner.queue.store-tasks-on-disk
1238+
[store_tasks_on_disk: <boolean> | default = false]
1239+
1240+
# Directory to store tasks on disk.
1241+
# CLI flag: -bloom-build.planner.queue.tasks-disk-directory
1242+
[tasks_disk_directory: <string> | default = "/tmp/bloom-planner-queue"]
1243+
1244+
# Whether to clean the tasks directory on startup.
1245+
# CLI flag: -bloom-build.planner.queue.clean-tasks-directory
1246+
[clean_tasks_directory: <boolean> | default = false]
1247+
12361248
builder:
12371249
# The grpc_client block configures the gRPC client used to communicate between
12381250
# a client and server component in Loki.

pkg/bloombuild/planner/planner.go

+23-18
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func New(
8080
// Queue to manage tasks
8181
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
8282
queueLimits := NewQueueLimits(limits)
83-
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
83+
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics, storageMetrics)
8484
if err != nil {
8585
return nil, fmt.Errorf("error creating tasks queue: %w", err)
8686
}
@@ -280,7 +280,8 @@ func (p *Planner) runOne(ctx context.Context) error {
280280

281281
now := time.Now()
282282
for _, task := range tasks {
283-
queueTask := NewQueueTask(ctx, now, task, resultsCh)
283+
protoTask := task.ToProtoTask()
284+
queueTask := NewQueueTask(ctx, now, protoTask, resultsCh)
284285
if err := p.enqueueTask(queueTask); err != nil {
285286
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
286287
continue
@@ -703,7 +704,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
703704
}
704705

705706
func (p *Planner) enqueueTask(task *QueueTask) error {
706-
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
707+
return p.tasksQueue.Enqueue(task.ProtoTask, task.TaskMeta, func() {
707708
task.timesEnqueued.Add(1)
708709
})
709710
}
@@ -738,7 +739,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
738739

739740
lastIndex := queue.StartIndex
740741
for p.isRunningOrStopping() {
741-
item, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
742+
protoTask, meta, idx, err := p.tasksQueue.Dequeue(builder.Context(), lastIndex, builderID)
742743
if err != nil {
743744
if errors.Is(err, queue.ErrStopped) {
744745
// Planner is stopping, break the loop and return
@@ -748,36 +749,40 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
748749
}
749750
lastIndex = idx
750751

751-
if item == nil {
752+
if protoTask == nil {
752753
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
753754
}
754755

755-
task := item.(*QueueTask)
756-
logger := log.With(logger, "task", task.ID())
756+
task := &QueueTask{
757+
ProtoTask: protoTask,
758+
TaskMeta: meta.(*TaskMeta),
759+
}
760+
761+
logger := log.With(logger, "task", task.Id)
757762

758763
queueTime := time.Since(task.queueTime)
759764
p.metrics.queueDuration.Observe(queueTime.Seconds())
760765

761766
if task.ctx.Err() != nil {
762767
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
763768
lastIndex = lastIndex.ReuseLastIndex()
764-
p.tasksQueue.Release(task)
769+
p.tasksQueue.Release(task.ProtoTask)
765770
continue
766771
}
767772

768773
result, err := p.forwardTaskToBuilder(builder, builderID, task)
769774
if err != nil {
770-
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
775+
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
771776
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
772-
p.tasksQueue.Release(task)
777+
p.tasksQueue.Release(task.ProtoTask)
773778
level.Error(logger).Log(
774779
"msg", "task failed after max retries",
775780
"retries", task.timesEnqueued.Load(),
776781
"maxRetries", maxRetries,
777782
"err", err,
778783
)
779784
task.resultsChannel <- &protos.TaskResult{
780-
TaskID: task.ID(),
785+
TaskID: task.Id,
781786
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
782787
}
783788
continue
@@ -786,10 +791,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
786791
// Re-queue the task if the builder is failing to process the tasks
787792
if err := p.enqueueTask(task); err != nil {
788793
p.metrics.taskLost.Inc()
789-
p.tasksQueue.Release(task)
794+
p.tasksQueue.Release(task.ProtoTask)
790795
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
791796
task.resultsChannel <- &protos.TaskResult{
792-
TaskID: task.ID(),
797+
TaskID: task.Id,
793798
Error: fmt.Errorf("error re-enqueuing task: %w", err),
794799
}
795800
continue
@@ -809,7 +814,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
809814
"duration", time.Since(task.queueTime).Seconds(),
810815
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
811816
)
812-
p.tasksQueue.Release(task)
817+
p.tasksQueue.Release(task.ProtoTask)
813818

814819
// Send the result back to the task. The channel is buffered, so this should not block.
815820
task.resultsChannel <- result
@@ -824,7 +829,7 @@ func (p *Planner) forwardTaskToBuilder(
824829
task *QueueTask,
825830
) (*protos.TaskResult, error) {
826831
msg := &protos.PlannerToBuilder{
827-
Task: task.ToProtoTask(),
832+
Task: task.ProtoTask,
828833
}
829834

830835
if err := builder.Send(msg); err != nil {
@@ -846,7 +851,7 @@ func (p *Planner) forwardTaskToBuilder(
846851
}()
847852

848853
timeout := make(<-chan time.Time)
849-
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
854+
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
850855
if taskTimeout != 0 {
851856
// If the timeout is not 0 (disabled), configure it
852857
timeout = time.After(taskTimeout)
@@ -886,8 +891,8 @@ func (p *Planner) receiveResultFromBuilder(
886891
if err != nil {
887892
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
888893
}
889-
if result.TaskID != task.ID() {
890-
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
894+
if result.TaskID != task.Id {
895+
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.Id)
891896
}
892897

893898
return result, nil

pkg/bloombuild/planner/planner_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/grafana/dskit/services"
1414
"github.com/pkg/errors"
1515
"github.com/prometheus/client_golang/prometheus"
16+
"github.com/prometheus/common/model"
1617
"github.com/stretchr/testify/require"
1718
"go.uber.org/atomic"
1819
"google.golang.org/grpc"
@@ -725,7 +726,7 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
725726
for i := 0; i < n; i++ {
726727
task := NewQueueTask(
727728
context.Background(), time.Now(),
728-
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
729+
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)), plannertest.TsdbID(1), nil).ToProtoTask(),
729730
resultsCh,
730731
)
731732
tasks = append(tasks, task)

pkg/bloombuild/planner/queue/config.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,31 @@ package queue
22

33
import (
44
"flag"
5+
"fmt"
56

67
"github.com/grafana/loki/v3/pkg/queue"
78
)
89

910
type Config struct {
10-
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
11+
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
12+
StoreTasksOnDisk bool `yaml:"store_tasks_on_disk"`
13+
TasksDiskDirectory string `yaml:"tasks_disk_directory"`
14+
CleanTasksDirectory bool `yaml:"clean_tasks_directory"`
1115
}
1216

1317
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
1418
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
1519
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
20+
f.BoolVar(&cfg.StoreTasksOnDisk, prefix+".store-tasks-on-disk", false, "Whether to store tasks on disk.")
21+
f.StringVar(&cfg.TasksDiskDirectory, prefix+".tasks-disk-directory", "/tmp/bloom-planner-queue", "Directory to store tasks on disk.")
22+
f.BoolVar(&cfg.CleanTasksDirectory, prefix+".clean-tasks-directory", false, "Whether to clean the tasks directory on startup.")
1623
}
1724

1825
func (cfg *Config) Validate() error {
26+
if cfg.StoreTasksOnDisk && cfg.TasksDiskDirectory == "" {
27+
return fmt.Errorf("tasks_disk_directory must be set when store_tasks_on_disk is true")
28+
}
29+
1930
return nil
2031
}
2132

0 commit comments

Comments
 (0)