Skip to content

Commit

Permalink
refactor(blockbuilder): transport splitting (grafana#15315)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 10, 2024
1 parent 7126441 commit 1f02adf
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 290 deletions.
135 changes: 69 additions & 66 deletions pkg/blockbuilder/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@ The Block Builder system is organized into three main packages:

### pkg/blockbuilder/types
- Contains shared type definitions and interfaces
- Defines core data structures like `Job` and `Offsets`
- Defines core data structures like `Job` and `JobStatus`
- Provides interface definitions for:
- `Worker`: Interface for processing jobs and reporting status
- `Scheduler`: Interface for job scheduling and worker management
- `Transport`: Interface for communication between components
- `BuilderTransport`: Interface for builder-to-scheduler communication
- `SchedulerHandler`: Interface for scheduler business logic

The transport layer is split into client and server components:
- Client side uses `BuilderTransport` to abstract gRPC details
- Server side uses `SchedulerHandler` for pure business logic
- A gRPC adapter connects the two sides

### pkg/blockbuilder/scheduler
- Implements the job queue and scheduling logic
- Manages job distribution to block builders
- Tracks job progress and ensures exactly-once processing
- Handles job state management and offset tracking
- Implements `SchedulerHandler` interface for business logic
- Uses gRPC adapter to expose services to builders

### pkg/blockbuilder/builder
- Implements the block builder worker functionality
- Implements the block builder functionality
- Uses `BuilderTransport` to communicate with scheduler
- Processes assigned jobs and builds storage formats
- Manages transport layer communication
- Handles data processing and object storage interactions

## Component Diagram
Expand All @@ -42,14 +47,19 @@ graph TB
PC[Partition Controller]
subgraph Transport Layer
T[gRPC/Transport Interface]
GA[gRPC Adapter]
SH[Scheduler Handler]
end
end
subgraph Block Builders
BB1[Block Builder 1]
BB2[Block Builder 2]
BB3[Block Builder N]
subgraph Builder Transport
BT[Builder Transport]
end
end
subgraph Storage
Expand All @@ -59,10 +69,12 @@ graph TB
KP --> PC
PC --> S
S <--> Q
S <--> T
T <--> BB1
T <--> BB2
T <--> BB3
S --> SH
SH <--> GA
GA <--> BT
BT <--> BB1
BT <--> BB2
BT <--> BB3
BB1 --> OS
BB2 --> OS
BB3 --> OS
Expand All @@ -73,9 +85,10 @@ graph TB
```mermaid
sequenceDiagram
participant PC as Partition Controller
participant S as Block Scheduler
participant Q as Queue
participant T as Transport
participant S as Scheduler
participant SH as SchedulerHandler
participant GA as gRPC Adapter
participant BT as Builder Transport
participant BB as Block Builder
participant OS as Object Storage
Expand All @@ -85,57 +98,53 @@ sequenceDiagram
S->>Q: Enqueue Job
end
BB->>T: Request Job
T->>S: Forward Request
S->>Q: Dequeue Job
Q-->>S: Return Job (or empty)
BB->>BT: Request Job
BT->>GA: gRPC GetJob Request
GA->>SH: HandleGetJob
SH->>S: Get Job from Queue
S-->>SH: Return Job (or empty)
alt Has Job
S->>T: Send Job
T->>BB: Forward Job
SH-->>GA: Return Job
GA-->>BT: gRPC Response
BT-->>BB: Return Job
BB->>OS: Process & Write Data
BB->>T: Report Success
T->>S: Forward Status
BB->>BT: Report Success
BT->>GA: gRPC CompleteJob
GA->>SH: HandleCompleteJob
SH->>S: Mark Complete
S->>PC: Commit Offset
else No Job
S->>T: Send No Job Available
T->>BB: Forward Response
SH-->>GA: Return No Job
GA-->>BT: gRPC Response
BT-->>BB: Return No Job
end
```

## Core Components

### Job and Offsets
- `Job`: Represents a unit of work for processing Kafka data
- Contains a partition ID and an offset range
- Immutable data structure that can be safely passed between components
- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process
- Used to track progress and ensure exactly-once processing

### Block Scheduler
- Central component responsible for:
- Managing the job queue
- Coordinating Block Builder assignments
- Tracking job progress
- Implements a pull-based model where Block Builders request jobs
- Decoupled from specific transport mechanisms through the Transport interface

### Block Builder
- Processes jobs assigned by the Block Scheduler
- Responsible for:
- Building storage formats from Kafka data
- Writing completed blocks to object storage
- Reporting job status back to scheduler
- Implements the Worker interface for job processing

### Transport Layer
- Provides communication between Block Builders and Scheduler
- Abstracts transport mechanism (currently in-memory & gRPC)
- Defines message types for:
- Job requests
- Job completion notifications
- Job synchronization

## Design Principles
## Interface Design

The system uses a layered interface approach:

1. **Builder Side**:
- Simple API for job processing
- `BuilderTransport`: Handles communication details
- Builders work with domain types, unaware of gRPC

2. **Transport Layer**:
- gRPC service definitions in proto files
- Adapter pattern to convert between proto and domain types
- Clear separation between transport and business logic

3. **Scheduler Side**:
- `SchedulerHandler`: Pure business logic interface
- No knowledge of transport details
- Clean separation of concerns

This design allows for:
- Easy testing of each layer independently
- Flexibility to change transport mechanism
- Clear separation between business logic and communication
- Type-safe conversions between proto and domain types

### Decoupled I/O
- Business logic is separated from I/O operations
Expand All @@ -150,10 +159,4 @@ sequenceDiagram
### Pull-Based Architecture
- Block Builders pull jobs when ready
- Natural load balancing
- Prevents overloading of workers


### Interface-Driven Development
- Core components defined by interfaces
- Allows for multiple implementations
- Facilitates testing and modularity
- Prevents overloading of workers
67 changes: 0 additions & 67 deletions pkg/blockbuilder/builder/worker.go

This file was deleted.

45 changes: 2 additions & 43 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ import (
"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/kafka/partition"
)

var (
_ types.Scheduler = &BlockScheduler{}
_ types.SchedulerHandler = &BlockScheduler{}
)

type Config struct {
Expand Down Expand Up @@ -175,7 +174,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
}
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job) error {
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error {
// TODO: handle commits
s.queue.MarkComplete(job.ID)
return nil
Expand All @@ -185,43 +184,3 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
s.queue.SyncJob(job.ID, builderID, job)
return nil
}

func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
s.queue.MarkComplete(req.Job.Id)
return &proto.CompleteJobResponse{}, nil
}

func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
ID: req.Job.Id,
Partition: req.Job.Partition,
Offsets: types.Offsets{
Min: req.Job.Offsets.Min,
Max: req.Job.Offsets.Max,
},
})

return &proto.SyncJobResponse{}, nil
}

func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
var resp proto.GetJobResponse
job, ok, err := s.queue.Dequeue(req.BuilderId)
if err != nil {
return &resp, err
}

if ok {
resp.Ok = true
resp.Job = &proto.Job{
Id: job.ID,
Partition: job.Partition,
Offsets: &proto.Offsets{
Min: job.Offsets.Min,
Max: job.Offsets.Max,
},
}
}

return &resp, nil
}
Loading

0 comments on commit 1f02adf

Please sign in to comment.