Skip to content

Commit a10140d

Browse files
authored
feat: Block scheduler scaffolding (#15198)
1 parent be4f17e commit a10140d

19 files changed

+730
-37
lines changed

pkg/blockbuilder/architecture.md

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Block Builder Architecture
2+
3+
## Overview
4+
5+
The Block Builder and Block Scheduler are separate components designed to build storage formats from ingested Kafka data. The Block Scheduler coordinates job distribution to multiple Block Builder instances, implementing a pull-based architecture that decouples read and write operations, allowing for independent scaling and simpler operational management. This document describes the architecture and interaction between components.
6+
7+
## Package Structure
8+
9+
The Block Builder system is organized into three main packages:
10+
11+
### pkg/blockbuilder/types
12+
- Contains shared type definitions and interfaces
13+
- Defines core data structures like `Job` and `Offsets`
14+
- Provides interface definitions for:
15+
- `Worker`: Interface for processing jobs and reporting status
16+
- `Scheduler`: Interface for job scheduling and worker management
17+
- `Transport`: Interface for communication between components
18+
19+
### pkg/blockbuilder/scheduler
20+
- Implements the job queue and scheduling logic
21+
- Manages job distribution to block builders
22+
- Tracks job progress and ensures exactly-once processing
23+
- Handles job state management and offset tracking
24+
25+
### pkg/blockbuilder/builder
26+
- Implements the block builder worker functionality
27+
- Processes assigned jobs and builds storage formats
28+
- Manages transport layer communication
29+
- Handles data processing and object storage interactions
30+
31+
## Component Diagram
32+
33+
```mermaid
34+
graph TB
35+
subgraph Kafka
36+
KP[Kafka Partitions]
37+
end
38+
39+
subgraph Block Scheduler
40+
S[Scheduler]
41+
Q[Job Queue]
42+
PC[Partition Controller]
43+
44+
subgraph Transport Layer
45+
T[gRPC/Transport Interface]
46+
end
47+
end
48+
49+
subgraph Block Builders
50+
BB1[Block Builder 1]
51+
BB2[Block Builder 2]
52+
BB3[Block Builder N]
53+
end
54+
55+
subgraph Storage
56+
OS[Object Storage]
57+
end
58+
59+
KP --> PC
60+
PC --> S
61+
S <--> Q
62+
S <--> T
63+
T <--> BB1
64+
T <--> BB2
65+
T <--> BB3
66+
BB1 --> OS
67+
BB2 --> OS
68+
BB3 --> OS
69+
```
70+
71+
## Job Processing Sequence
72+
73+
```mermaid
74+
sequenceDiagram
75+
participant PC as Partition Controller
76+
participant S as Block Scheduler
77+
participant Q as Queue
78+
participant T as Transport
79+
participant BB as Block Builder
80+
participant OS as Object Storage
81+
82+
loop Monitor Partitions
83+
PC->>PC: Check for new offsets
84+
PC->>S: Create Job (partition, offset range)
85+
S->>Q: Enqueue Job
86+
end
87+
88+
BB->>T: Request Job
89+
T->>S: Forward Request
90+
S->>Q: Dequeue Job
91+
Q-->>S: Return Job (or empty)
92+
alt Has Job
93+
S->>T: Send Job
94+
T->>BB: Forward Job
95+
BB->>OS: Process & Write Data
96+
BB->>T: Report Success
97+
T->>S: Forward Status
98+
S->>PC: Commit Offset
99+
else No Job
100+
S->>T: Send No Job Available
101+
T->>BB: Forward Response
102+
end
103+
```
104+
105+
## Core Components
106+
107+
### Job and Offsets
108+
- `Job`: Represents a unit of work for processing Kafka data
109+
- Contains a partition ID and an offset range
110+
- Immutable data structure that can be safely passed between components
111+
- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process
112+
- Used to track progress and ensure exactly-once processing
113+
114+
### Block Scheduler
115+
- Central component responsible for:
116+
- Managing the job queue
117+
- Coordinating Block Builder assignments
118+
- Tracking job progress
119+
- Implements a pull-based model where Block Builders request jobs
120+
- Decoupled from specific transport mechanisms through the Transport interface
121+
122+
### Block Builder
123+
- Processes jobs assigned by the Block Scheduler
124+
- Responsible for:
125+
- Building storage formats from Kafka data
126+
- Writing completed blocks to object storage
127+
- Reporting job status back to scheduler
128+
- Implements the Worker interface for job processing
129+
130+
### Transport Layer
131+
- Provides communication between Block Builders and Scheduler
132+
- Abstracts transport mechanism (currently in-memory & gRPC)
133+
- Defines message types for:
134+
- Job requests
135+
- Job completion notifications
136+
- Job synchronization
137+
138+
## Design Principles
139+
140+
### Decoupled I/O
141+
- Business logic is separated from I/O operations
142+
- Transport interface allows for different communication mechanisms
143+
- Enables easier testing through mock implementations
144+
145+
### Stateless Design
146+
- Block Builders are stateless workers
147+
- All state is managed by the Scheduler
148+
- Allows for easy scaling and failover
149+
150+
### Pull-Based Architecture
151+
- Block Builders pull jobs when ready
152+
- Natural load balancing
153+
- Prevents overloading of workers
154+
155+
156+
### Interface-Driven Development
157+
- Core components defined by interfaces
158+
- Allows for multiple implementations
159+
- Facilitates testing and modularity
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package builder
2+
3+
import (
4+
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
5+
)
6+
7+
// TestBuilder implements Worker interface for testing
8+
type TestBuilder struct {
9+
*Worker
10+
}
11+
12+
func NewTestBuilder(builderID string, transport types.Transport) *TestBuilder {
13+
return &TestBuilder{
14+
Worker: NewWorker(builderID, transport),
15+
}
16+
}

pkg/blockbuilder/controller.go pkg/blockbuilder/builder/controller.go

+17-28
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"context"
@@ -7,26 +7,16 @@ import (
77

88
"github.com/go-kit/log"
99
"github.com/go-kit/log/level"
10-
"github.com/prometheus/prometheus/model/labels"
11-
1210
"github.com/grafana/dskit/backoff"
11+
"github.com/prometheus/prometheus/model/labels"
1312

13+
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
1414
"github.com/grafana/loki/v3/pkg/kafka"
1515
"github.com/grafana/loki/v3/pkg/kafka/partition"
1616

1717
"github.com/grafana/loki/pkg/push"
1818
)
1919

20-
// [min,max)
21-
type Offsets struct {
22-
Min, Max int64
23-
}
24-
25-
type Job struct {
26-
Partition int32
27-
Offsets Offsets
28-
}
29-
3020
// Interface required for interacting with queue partitions.
3121
type PartitionController interface {
3222
Topic() string
@@ -43,7 +33,7 @@ type PartitionController interface {
4333
// so it's advised to not buffer the channel for natural backpressure.
4434
// As a convenience, it returns the last seen offset, which matches
4535
// the final record sent on the channel.
46-
Process(context.Context, Offsets, chan<- []AppendInput) (int64, error)
36+
Process(context.Context, types.Offsets, chan<- []AppendInput) (int64, error)
4737

4838
Close() error
4939
}
@@ -125,7 +115,7 @@ func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (i
125115
)
126116
}
127117

128-
func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
118+
func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
129119
l.part.SetOffsetForConsumption(offsets.Min)
130120

131121
var (
@@ -188,16 +178,16 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
188178

189179
// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition
190180
// Returns whether an applicable job exists, the job, and an error
191-
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) {
181+
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *types.Job, error) {
192182
// Read the most recent committed offset
193183
committedOffset, err := l.HighestCommittedOffset(ctx)
194184
if err != nil {
195-
return false, Job{}, err
185+
return false, nil, err
196186
}
197187

198188
earliestOffset, err := l.EarliestPartitionOffset(ctx)
199189
if err != nil {
200-
return false, Job{}, err
190+
return false, nil, err
201191
}
202192

203193
startOffset := committedOffset + 1
@@ -207,28 +197,27 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)
207197

208198
highestOffset, err := l.HighestPartitionOffset(ctx)
209199
if err != nil {
210-
return false, Job{}, err
200+
return false, nil, err
211201
}
212202

213203
if highestOffset < committedOffset {
214204
level.Error(l.logger).Log("msg", "partition highest offset is less than committed offset", "highest", highestOffset, "committed", committedOffset)
215-
return false, Job{}, fmt.Errorf("partition highest offset is less than committed offset")
205+
return false, nil, fmt.Errorf("partition highest offset is less than committed offset")
216206
}
217207

218208
if highestOffset == committedOffset {
219209
level.Info(l.logger).Log("msg", "no pending records to process")
220-
return false, Job{}, nil
210+
return false, nil, nil
221211
}
222212

223213
// Create the job with the calculated offsets
224-
job := Job{
225-
Partition: l.part.Partition(),
226-
Offsets: Offsets{
227-
Min: startOffset,
228-
Max: min(startOffset+l.stepLen, highestOffset),
229-
},
214+
offsets := types.Offsets{
215+
Min: startOffset,
216+
Max: min(startOffset+l.stepLen, highestOffset),
230217
}
231218

219+
// Convert partition from int32 to int
220+
job := types.NewJob(int(l.part.Partition()), offsets)
232221
return true, job, nil
233222
}
234223

@@ -279,7 +268,7 @@ func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error
279268
return nil
280269
}
281270

282-
func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
271+
func (d *dummyPartitionController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
283272
for i := int(offsets.Min); i < int(offsets.Max); i++ {
284273
batch := d.createBatch(i)
285274
select {

pkg/blockbuilder/metrics.go pkg/blockbuilder/builder/metrics.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"github.com/prometheus/client_golang/prometheus"

pkg/blockbuilder/pipeline.go pkg/blockbuilder/builder/pipeline.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"context"

pkg/blockbuilder/pipeline_test.go pkg/blockbuilder/builder/pipeline_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"context"

pkg/blockbuilder/slimgester.go pkg/blockbuilder/builder/slimgester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"bytes"

pkg/blockbuilder/storage.go pkg/blockbuilder/builder/storage.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"context"

pkg/blockbuilder/storage_test.go pkg/blockbuilder/builder/storage_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"os"

pkg/blockbuilder/builder/transport.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package builder
2+
3+
import (
4+
"context"
5+
6+
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
7+
)
8+
9+
var (
10+
_ types.Transport = unimplementedTransport{}
11+
_ types.Transport = &MemoryTransport{}
12+
)
13+
14+
// unimplementedTransport provides default implementations that panic
15+
type unimplementedTransport struct{}
16+
17+
func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) {
18+
panic("unimplemented")
19+
}
20+
21+
func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error {
22+
panic("unimplemented")
23+
}
24+
25+
func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error {
26+
panic("unimplemented")
27+
}
28+
29+
// MemoryTransport implements Transport interface for in-memory communication
30+
type MemoryTransport struct {
31+
scheduler types.Scheduler
32+
}
33+
34+
// NewMemoryTransport creates a new in-memory transport instance
35+
func NewMemoryTransport(scheduler types.Scheduler) *MemoryTransport {
36+
return &MemoryTransport{
37+
scheduler: scheduler,
38+
}
39+
}
40+
41+
func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *types.GetJobRequest) (*types.GetJobResponse, error) {
42+
job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID)
43+
if err != nil {
44+
return nil, err
45+
}
46+
return &types.GetJobResponse{
47+
Job: job,
48+
OK: ok,
49+
}, nil
50+
}
51+
52+
func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *types.CompleteJobRequest) error {
53+
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job)
54+
}
55+
56+
func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *types.SyncJobRequest) error {
57+
return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job)
58+
}

pkg/blockbuilder/tsdb.go pkg/blockbuilder/builder/tsdb.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package blockbuilder
1+
package builder
22

33
import (
44
"bytes"

0 commit comments

Comments
 (0)