diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 1160adff8f8ba..47587bce9fa81 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -192,6 +192,10 @@ block_builder: # CLI flag: -blockbuilder.sync-interval [sync_interval: | default = 30s] + # The interval at which to poll for new jobs. + # CLI flag: -blockbuilder.poll-interval + [poll_interval: | default = 30s] + # Address of the scheduler in the format described here: # https://github.com/grpc/grpc/blob/master/doc/naming.md # CLI flag: -blockbuilder.scheduler-address diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index e3c1420b3cd81..aaeaad0d864fb 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -42,6 +42,7 @@ type Config struct { Backoff backoff.Config `yaml:"backoff_config"` WorkerParallelism int `yaml:"worker_parallelism"` SyncInterval time.Duration `yaml:"sync_interval"` + PollInterval time.Duration `yaml:"poll_interval"` SchedulerAddress string `yaml:"scheduler_address"` // SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler. @@ -58,6 +59,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs())) f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.") f.DurationVar(&cfg.SyncInterval, prefix+"sync-interval", 30*time.Second, "The interval at which to sync job status with the scheduler.") + f.DurationVar(&cfg.PollInterval, prefix+"poll-interval", 30*time.Second, "The interval at which to poll for new jobs.") f.IntVar(&cfg.WorkerParallelism, prefix+"worker-parallelism", 1, "The number of workers to run in parallel to process jobs.") f.StringVar(&cfg.SchedulerAddress, prefix+"scheduler-address", "", "Address of the scheduler in the format described here: https://github.com/grpc/grpc/blob/master/doc/naming.md") @@ -81,6 +83,10 @@ func (cfg *Config) Validate() error { return errors.New("sync interval must be greater than 0") } + if cfg.PollInterval <= 0 { + return errors.New("poll interval must be greater than 0") + } + if cfg.WorkerParallelism < 1 { return errors.New("worker parallelism must be greater than 0") } @@ -165,15 +171,23 @@ func (i *BlockBuilder) running(ctx context.Context) error { go func(id string) { defer wg.Done() + var waitFor time.Duration for { select { case <-ctx.Done(): return - default: - err := i.runOne(ctx, id) + case <-time.After(waitFor): + gotJob, err := i.runOne(ctx, id) if err != nil { level.Error(i.logger).Log("msg", "block builder run failed", "err", err) } + + // poll only when there are no jobs + if gotJob { + waitFor = 0 + } else { + waitFor = i.cfg.PollInterval + } } } }(fmt.Sprintf("worker-%d", j)) @@ -218,18 +232,18 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error { return nil } -func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error { +func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) { // assuming GetJob blocks/polls until a job is available resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{ BuilderID: workerID, }) if err != nil { - return err + return false, err } if !resp.OK { level.Info(i.logger).Log("msg", "no available job to process") - return nil + return false, nil } job := resp.Job @@ -262,7 +276,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error { return }, ); err != nil { - return err + return true, err } i.jobsMtx.Lock() @@ -270,7 +284,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error { i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) i.jobsMtx.Unlock() - return err + return true, err } func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) { @@ -297,7 +311,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "load records", 1, func(ctx context.Context) error { - lastOffset, err = i.loadRecords(ctx, int32(job.Partition), job.Offsets, inputCh) + lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh) return err }, func(ctx context.Context) error { @@ -545,7 +559,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse } } - return lastOffset, err + return lastOffset, boff.Err() } func withBackoff[T any]( diff --git a/pkg/blockbuilder/builder/builder_test.go b/pkg/blockbuilder/builder/builder_test.go deleted file mode 100644 index ac9890526f1d7..0000000000000 --- a/pkg/blockbuilder/builder/builder_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package builder - -import ( - "github.com/grafana/loki/v3/pkg/blockbuilder/types" -) - -// TestBuilder implements Worker interface for testing -type TestBuilder struct { - *Worker -} - -func NewTestBuilder(builderID string, transport types.Transport) *TestBuilder { - return &TestBuilder{ - Worker: NewWorker(builderID, transport), - } -} diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 632e6842993a5..e2272a039f5c4 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -16,10 +16,10 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" + "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" ) var ( - _ types.Scheduler = unimplementedScheduler{} _ types.Scheduler = &BlockScheduler{} ) @@ -185,17 +185,42 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job return nil } -// unimplementedScheduler provides default implementations that panic. -type unimplementedScheduler struct{} - -func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { - panic("unimplemented") +func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) { + s.queue.MarkComplete(req.Job.Id) + return &proto.CompleteJobResponse{}, nil } -func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) { + s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{ + ID: req.Job.Id, + Partition: req.Job.Partition, + Offsets: types.Offsets{ + Min: req.Job.Offsets.Min, + Max: req.Job.Offsets.Max, + }, + }) + + return &proto.SyncJobResponse{}, nil } -func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { - panic("unimplemented") +func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { + var resp proto.GetJobResponse + job, ok, err := s.queue.Dequeue(req.BuilderId) + if err != nil { + return &resp, err + } + + if ok { + resp.Ok = true + resp.Job = &proto.Job{ + Id: job.ID, + Partition: job.Partition, + Offsets: &proto.Offsets{ + Min: job.Offsets.Min, + Max: job.Offsets.Max, + }, + } + } + + return &resp, nil } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 0b8c38aa64d11..75719140a4ea0 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -70,7 +70,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], currentEnd := min(currentStart+p.targetRecordCount, endOffset) job := NewJobWithPriority( - types.NewJob(int(partitionOffset.Partition), types.Offsets{ + types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, Max: currentEnd, }), int(endOffset-currentStart), // priority is remaining records to process diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go index a5a9fa843671e..b083cd76bb37a 100644 --- a/pkg/blockbuilder/types/grpc_transport.go +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -111,7 +111,7 @@ func protoToJob(p *proto.Job) *Job { } return &Job{ ID: p.GetId(), - Partition: int(p.GetPartition()), + Partition: p.GetPartition(), Offsets: Offsets{ Min: p.GetOffsets().GetMin(), Max: p.GetOffsets().GetMax(), @@ -126,7 +126,7 @@ func jobToProto(j *Job) *proto.Job { } return &proto.Job{ Id: j.ID, - Partition: int32(j.Partition), + Partition: j.Partition, Offsets: &proto.Offsets{ Min: j.Offsets.Min, Max: j.Offsets.Max, diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index 2c06fec4d48cd..9cf94daebd484 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -6,7 +6,7 @@ import "fmt" type Job struct { ID string // Partition and offset information - Partition int + Partition int32 Offsets Offsets } @@ -26,7 +26,7 @@ type Offsets struct { } // NewJob creates a new job with the given partition and offsets -func NewJob(partition int, offsets Offsets) *Job { +func NewJob(partition int32, offsets Offsets) *Job { return &Job{ ID: GenerateJobID(partition, offsets), Partition: partition, @@ -35,6 +35,6 @@ func NewJob(partition int, offsets Offsets) *Job { } // GenerateJobID creates a deterministic job ID from partition and offsets -func GenerateJobID(partition int, offsets Offsets) string { +func GenerateJobID(partition int32, offsets Offsets) string { return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index bd2854f4752c6..9b521eb7fc083 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler" + blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -1862,8 +1863,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { if err != nil { return nil, fmt.Errorf("creating kafka client: %w", err) } + offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c) - return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil + s, err := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil + if err != nil { + return s, err + } + + blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s) + + return s, err } func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {