Skip to content

Commit

Permalink
feat(scheduler): implement and register block builder rpc service (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Dec 9, 2024
1 parent 17f1972 commit c519ab6
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 42 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ block_builder:
# CLI flag: -blockbuilder.sync-interval
[sync_interval: <duration> | default = 30s]

# The interval at which to poll for new jobs.
# CLI flag: -blockbuilder.poll-interval
[poll_interval: <duration> | default = 30s]

# Address of the scheduler in the format described here:
# https://github.com/grpc/grpc/blob/master/doc/naming.md
# CLI flag: -blockbuilder.scheduler-address
Expand Down
32 changes: 23 additions & 9 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
Backoff backoff.Config `yaml:"backoff_config"`
WorkerParallelism int `yaml:"worker_parallelism"`
SyncInterval time.Duration `yaml:"sync_interval"`
PollInterval time.Duration `yaml:"poll_interval"`

SchedulerAddress string `yaml:"scheduler_address"`
// SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
Expand All @@ -58,6 +59,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
f.DurationVar(&cfg.SyncInterval, prefix+"sync-interval", 30*time.Second, "The interval at which to sync job status with the scheduler.")
f.DurationVar(&cfg.PollInterval, prefix+"poll-interval", 30*time.Second, "The interval at which to poll for new jobs.")
f.IntVar(&cfg.WorkerParallelism, prefix+"worker-parallelism", 1, "The number of workers to run in parallel to process jobs.")
f.StringVar(&cfg.SchedulerAddress, prefix+"scheduler-address", "", "Address of the scheduler in the format described here: https://github.com/grpc/grpc/blob/master/doc/naming.md")

Expand All @@ -81,6 +83,10 @@ func (cfg *Config) Validate() error {
return errors.New("sync interval must be greater than 0")
}

if cfg.PollInterval <= 0 {
return errors.New("poll interval must be greater than 0")
}

if cfg.WorkerParallelism < 1 {
return errors.New("worker parallelism must be greater than 0")
}
Expand Down Expand Up @@ -165,15 +171,23 @@ func (i *BlockBuilder) running(ctx context.Context) error {
go func(id string) {
defer wg.Done()

var waitFor time.Duration
for {
select {
case <-ctx.Done():
return
default:
err := i.runOne(ctx, id)
case <-time.After(waitFor):
gotJob, err := i.runOne(ctx, id)
if err != nil {
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}

// poll only when there are no jobs
if gotJob {
waitFor = 0
} else {
waitFor = i.cfg.PollInterval
}
}
}
}(fmt.Sprintf("worker-%d", j))
Expand Down Expand Up @@ -218,18 +232,18 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
return nil
}

func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
// assuming GetJob blocks/polls until a job is available
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: workerID,
})
if err != nil {
return err
return false, err
}

if !resp.OK {
level.Info(i.logger).Log("msg", "no available job to process")
return nil
return false, nil
}

job := resp.Job
Expand Down Expand Up @@ -262,15 +276,15 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
return
},
); err != nil {
return err
return true, err
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID)
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

return err
return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
Expand All @@ -297,7 +311,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, int32(job.Partition), job.Offsets, inputCh)
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
return err
},
func(ctx context.Context) error {
Expand Down Expand Up @@ -545,7 +559,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
}
}

return lastOffset, err
return lastOffset, boff.Err()
}

func withBackoff[T any](
Expand Down
16 changes: 0 additions & 16 deletions pkg/blockbuilder/builder/builder_test.go

This file was deleted.

45 changes: 35 additions & 10 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
)

var (
_ types.Scheduler = unimplementedScheduler{}
_ types.Scheduler = &BlockScheduler{}
)

Expand Down Expand Up @@ -185,17 +185,42 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
return nil
}

// unimplementedScheduler provides default implementations that panic.
type unimplementedScheduler struct{}

func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
panic("unimplemented")
func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
s.queue.MarkComplete(req.Job.Id)
return &proto.CompleteJobResponse{}, nil
}

func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
ID: req.Job.Id,
Partition: req.Job.Partition,
Offsets: types.Offsets{
Min: req.Job.Offsets.Min,
Max: req.Job.Offsets.Max,
},
})

return &proto.SyncJobResponse{}, nil
}

func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
var resp proto.GetJobResponse
job, ok, err := s.queue.Dequeue(req.BuilderId)
if err != nil {
return &resp, err
}

if ok {
resp.Ok = true
resp.Job = &proto.Job{
Id: job.ID,
Partition: job.Partition,
Offsets: &proto.Offsets{
Min: job.Offsets.Min,
Max: job.Offsets.Max,
},
}
}

return &resp, nil
}
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
currentEnd := min(currentStart+p.targetRecordCount, endOffset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Max: currentEnd,
}), int(endOffset-currentStart), // priority is remaining records to process
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func protoToJob(p *proto.Job) *Job {
}
return &Job{
ID: p.GetId(),
Partition: int(p.GetPartition()),
Partition: p.GetPartition(),
Offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
Expand All @@ -126,7 +126,7 @@ func jobToProto(j *Job) *proto.Job {
}
return &proto.Job{
Id: j.ID,
Partition: int32(j.Partition),
Partition: j.Partition,
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"
type Job struct {
ID string
// Partition and offset information
Partition int
Partition int32
Offsets Offsets
}

Expand All @@ -26,7 +26,7 @@ type Offsets struct {
}

// NewJob creates a new job with the given partition and offsets
func NewJob(partition int, offsets Offsets) *Job {
func NewJob(partition int32, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Partition: partition,
Expand All @@ -35,6 +35,6 @@ func NewJob(partition int, offsets Offsets) *Job {
}

// GenerateJobID creates a deterministic job ID from partition and offsets
func GenerateJobID(partition int, offsets Offsets) string {
func GenerateJobID(partition int32, offsets Offsets) string {
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max)
}
11 changes: 10 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder"
blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler"
blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -1862,8 +1863,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}

offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c)
return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
s, err := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
if err != nil {
return s, err
}

blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s)

return s, err
}

func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
Expand Down

0 comments on commit c519ab6

Please sign in to comment.