Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Extract planner queue into pkg (backport k227) #14925

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1277,15 +1277,16 @@ planner:
# CLI flag: -bloom-build.planner.max-table-offset
[max_table_offset: <int> | default = 2]

# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

retention:
# Enable bloom retention.
# CLI flag: -bloom-build.planner.retention.enabled
[enabled: <boolean> | default = false]

queue:
# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]

builder:
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
17 changes: 11 additions & 6 deletions pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"fmt"
"time"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)

// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
RetentionConfig RetentionConfig `yaml:"retention"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
RetentionConfig RetentionConfig `yaml:"retention"`
Queue queue.Config `yaml:"queue"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
Expand All @@ -28,8 +29,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// dynamically reloaded.
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f)
cfg.Queue.RegisterFlagsWithPrefix(prefix+".queue", f)
}

func (cfg *Config) Validate() error {
Expand All @@ -41,6 +42,10 @@ func (cfg *Config) Validate() error {
return err
}

if err := cfg.Queue.Validate(); err != nil {
return err
}

return nil
}

Expand Down
84 changes: 30 additions & 54 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
Expand All @@ -50,10 +49,7 @@ type Planner struct {
tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase

tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

pendingTasks sync.Map
tasksQueue *queue.Queue

metrics *Metrics
logger log.Logger
Expand Down Expand Up @@ -83,23 +79,21 @@ func New(

// Queue to manage tasks
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)

// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
queueMetrics.Cleanup(user)
})
queueLimits := NewQueueLimits(limits)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
if err != nil {
return nil, fmt.Errorf("error creating tasks queue: %w", err)
}

p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
activeUsers: activeUsers,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}

p.retentionManager = NewRetentionManager(
Expand All @@ -110,7 +104,7 @@ func New(
p.logger,
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}
svcs := []services.Service{p.tasksQueue}

if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
Expand Down Expand Up @@ -204,7 +198,7 @@ func (p *Planner) trackInflightRequests(ctx context.Context) {
return

case <-inflightTasksTicker.C:
inflight := p.totalPendingTasks()
inflight := p.tasksQueue.TotalPending()
p.metrics.inflightRequests.Observe(float64(inflight))
}
}
Expand Down Expand Up @@ -708,27 +702,9 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}

func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}

func (p *Planner) removePendingTask(task *QueueTask) {
p.pendingTasks.Delete(task.ID)
}

func (p *Planner) totalPendingTasks() (total int) {
p.pendingTasks.Range(func(_, _ interface{}) bool {
total++
return true
})
return total
}

func (p *Planner) enqueueTask(task *QueueTask) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
task.timesEnqueued.Add(1)
p.addPendingTask(task)
})
}

Expand Down Expand Up @@ -773,35 +749,35 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
lastIndex = idx

if item == nil {

return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}

task := item.(*QueueTask)
logger := log.With(logger, "task", task.ID)
logger := log.With(logger, "task", task.ID())

queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())

if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.removePendingTask(task)
p.tasksQueue.Release(task)
continue
}

result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
}
continue
Expand All @@ -810,10 +786,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("error re-enqueuing task: %w", err),
}
continue
Expand All @@ -833,7 +809,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.tasksQueue.Release(task)

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down Expand Up @@ -870,7 +846,7 @@ func (p *Planner) forwardTaskToBuilder(
}()

timeout := make(<-chan time.Time)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
if taskTimeout != 0 {
// If the timeout is not 0 (disabled), configure it
timeout = time.After(taskTimeout)
Expand Down Expand Up @@ -910,8 +886,8 @@ func (p *Planner) receiveResultFromBuilder(
if err != nil {
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
}
if result.TaskID != task.ID {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID)
if result.TaskID != task.ID() {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
}

return result, nil
Expand Down
31 changes: 19 additions & 12 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
Expand Down Expand Up @@ -163,8 +164,10 @@ func Test_BuilderLoop(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, tc.limits, logger)

Expand Down Expand Up @@ -206,7 +209,7 @@ func Test_BuilderLoop(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// Finally, the queue should be empty
require.Equal(t, 0, planner.totalPendingTasks())
require.Equal(t, 0, planner.tasksQueue.TotalPending())

// consume all tasks result to free up the channel for the next round of tasks
for i := 0; i < nTasks; i++ {
Expand All @@ -228,15 +231,15 @@ func Test_BuilderLoop(t *testing.T) {
if tc.shouldConsumeAfterModify {
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
} else {
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
Expand All @@ -254,10 +257,10 @@ func Test_BuilderLoop(t *testing.T) {
// Now all tasks should be consumed
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
}
})
Expand Down Expand Up @@ -384,8 +387,10 @@ func Test_processTenantTaskResults(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

Expand Down Expand Up @@ -544,8 +549,10 @@ func Test_deleteOutdatedMetas(t *testing.T) {
// logger := log.NewLogfmtLogger(os.Stdout)

cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

Expand Down
22 changes: 22 additions & 0 deletions pkg/bloombuild/planner/queue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package queue

import (
"flag"

"github.com/grafana/loki/v3/pkg/queue"
)

type Config struct {
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
}

func (cfg *Config) Validate() error {
return nil
}

type Limits = queue.Limits
Loading
Loading