@@ -49,7 +49,8 @@ type Planner struct {
49
49
tsdbStore common.TSDBStore
50
50
bloomStore bloomshipper.StoreBase
51
51
52
- tasksQueue * queue.Queue
52
+ tasksQueue * queue.Queue
53
+ planFactory * strategies.Factory
53
54
54
55
metrics * Metrics
55
56
logger log.Logger
@@ -86,14 +87,15 @@ func New(
86
87
}
87
88
88
89
p := & Planner {
89
- cfg : cfg ,
90
- limits : limits ,
91
- schemaCfg : schemaCfg ,
92
- tsdbStore : tsdbStore ,
93
- bloomStore : bloomStore ,
94
- tasksQueue : tasksQueue ,
95
- metrics : NewMetrics (r , tasksQueue .GetConnectedConsumersMetric ),
96
- logger : logger ,
90
+ cfg : cfg ,
91
+ limits : limits ,
92
+ schemaCfg : schemaCfg ,
93
+ tsdbStore : tsdbStore ,
94
+ bloomStore : bloomStore ,
95
+ tasksQueue : tasksQueue ,
96
+ planFactory : strategies .NewFactory (limits , strategies .NewMetrics (r ), logger ),
97
+ metrics : NewMetrics (r , tasksQueue .GetConnectedConsumersMetric ),
98
+ logger : logger ,
97
99
}
98
100
99
101
p .retentionManager = NewRetentionManager (
@@ -370,7 +372,7 @@ func (p *Planner) computeTasks(
370
372
table config.DayTable ,
371
373
tenant string ,
372
374
) ([]* protos.Task , []bloomshipper.Meta , error ) {
373
- strategy , err := strategies . NewStrategy ( tenant , p . limits , p . logger )
375
+ strategy , err := p . planFactory . GetStrategy ( tenant )
374
376
if err != nil {
375
377
return nil , nil , fmt .Errorf ("error creating strategy: %w" , err )
376
378
}
@@ -770,8 +772,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
770
772
continue
771
773
}
772
774
775
+ startTime := time .Now ()
773
776
result , err := p .forwardTaskToBuilder (builder , builderID , task )
774
777
if err != nil {
778
+ p .metrics .tenantTasksTiming .WithLabelValues (task .Tenant , statusFailure ).Observe (time .Since (startTime ).Seconds ())
775
779
maxRetries := p .limits .BloomTaskMaxRetries (task .Tenant )
776
780
if maxRetries > 0 && int (task .timesEnqueued .Load ()) >= maxRetries {
777
781
p .tasksQueue .Release (task .ProtoTask )
@@ -811,10 +815,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
811
815
812
816
level .Debug (logger ).Log (
813
817
"msg" , "task completed" ,
814
- "duration" , time .Since (task .queueTime ).Seconds (),
818
+ "timeSinceEnqueued" , time .Since (task .queueTime ).Seconds (),
819
+ "buildTime" , time .Since (startTime ).Seconds (),
815
820
"retries" , task .timesEnqueued .Load ()- 1 , // -1 because the first enqueue is not a retry
816
821
)
817
822
p .tasksQueue .Release (task .ProtoTask )
823
+ p .metrics .tenantTasksTiming .WithLabelValues (task .Tenant , statusSuccess ).Observe (time .Since (startTime ).Seconds ())
818
824
819
825
// Send the result back to the task. The channel is buffered, so this should not block.
820
826
task .resultsChannel <- result
@@ -866,7 +872,7 @@ func (p *Planner) forwardTaskToBuilder(
866
872
case err := <- errCh :
867
873
return nil , err
868
874
case <- timeout :
869
- return nil , fmt .Errorf ("timeout waiting for response from builder (%s)" , builderID )
875
+ return nil , fmt .Errorf ("timeout (%s) waiting for response from builder (%s)" , taskTimeout , builderID )
870
876
}
871
877
}
872
878
0 commit comments