@@ -26,7 +26,7 @@ import (
2626 "github.com/go-kit/log/level"
2727 connectprometheus "github.com/polarsignals/connect-go-prometheus"
2828 "github.com/prometheus/client_golang/api"
29- prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
29+ prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
3030 "github.com/prometheus/client_golang/prometheus"
3131 "github.com/prometheus/client_golang/prometheus/collectors"
3232 "github.com/prometheus/client_golang/prometheus/promhttp"
@@ -40,6 +40,7 @@ import (
4040
4141 objectivesv1alpha1 "github.com/pyrra-dev/pyrra/proto/objectives/v1alpha1"
4242 "github.com/pyrra-dev/pyrra/proto/objectives/v1alpha1/objectivesv1alpha1connect"
43+ "github.com/pyrra-dev/pyrra/proto/prometheus/v1/prometheusv1connect"
4344 "github.com/pyrra-dev/pyrra/slo"
4445)
4546
@@ -185,7 +186,7 @@ func cmdAPI(logger log.Logger, reg *prometheus.Registry, promClient api.Client,
185186 defer cache .Close ()
186187 promAPI := & promCache {
187188 api : & promLogger {
188- api : prometheusv1 .NewAPI (promClient ),
189+ api : prometheusapiv1 .NewAPI (promClient ),
189190 logger : logger ,
190191 },
191192 cache : cache ,
@@ -204,22 +205,32 @@ func cmdAPI(logger log.Logger, reg *prometheus.Registry, promClient api.Client,
204205
205206 r .Route (routePrefix , func (r chi.Router ) {
206207 objectiveService := & objectiveServer {
207- logger : logger ,
208+ logger : log . WithPrefix ( logger , "service" , "objective" ) ,
208209 promAPI : promAPI ,
209210 client : objectivesv1alpha1connect .NewObjectiveBackendServiceClient (
210211 http .DefaultClient ,
211212 apiURL .String (),
212213 connect .WithInterceptors (prometheusInterceptor ),
213214 ),
214215 }
216+
215217 objectivePath , objectiveHandler := objectivesv1alpha1connect .NewObjectiveServiceHandler (
216218 objectiveService ,
217219 connect .WithInterceptors (prometheusInterceptor ),
218220 )
221+
222+ prometheusService := & prometheusServer {
223+ logger : log .WithPrefix (logger , "service" , "prometheus" ),
224+ promAPI : promAPI ,
225+ }
226+ prometheusPath , prometheusHandler := prometheusv1connect .NewPrometheusServiceHandler (prometheusService )
227+
219228 if routePrefix != "/" {
220229 r .Mount (objectivePath , http .StripPrefix (routePrefix , objectiveHandler ))
230+ r .Mount (prometheusPath , http .StripPrefix (routePrefix , prometheusHandler ))
221231 } else {
222232 r .Mount (objectivePath , objectiveHandler )
233+ r .Mount (prometheusPath , prometheusHandler )
223234 }
224235
225236 r .Handle ("/metrics" , promhttp .HandlerFor (reg , promhttp.HandlerOpts {}))
@@ -333,17 +344,17 @@ func (c *thanosClient) Do(ctx context.Context, r *http.Request) (*http.Response,
333344
334345type prometheusAPI interface {
335346 // Query performs a query for the given time.
336- Query (ctx context.Context , query string , ts time.Time , opts ... prometheusv1 .Option ) (model.Value , prometheusv1 .Warnings , error )
347+ Query (ctx context.Context , query string , ts time.Time , opts ... prometheusapiv1 .Option ) (model.Value , prometheusapiv1 .Warnings , error )
337348 // QueryRange performs a query for the given range.
338- QueryRange (ctx context.Context , query string , r prometheusv1 .Range , opts ... prometheusv1 .Option ) (model.Value , prometheusv1 .Warnings , error )
349+ QueryRange (ctx context.Context , query string , r prometheusapiv1 .Range , opts ... prometheusapiv1 .Option ) (model.Value , prometheusapiv1 .Warnings , error )
339350}
340351
341352type promLogger struct {
342353 api prometheusAPI
343354 logger log.Logger
344355}
345356
346- func (l * promLogger ) Query (ctx context.Context , query string , ts time.Time , opts ... prometheusv1 .Option ) (model.Value , prometheusv1 .Warnings , error ) {
357+ func (l * promLogger ) Query (ctx context.Context , query string , ts time.Time , opts ... prometheusapiv1 .Option ) (model.Value , prometheusapiv1 .Warnings , error ) {
347358 level .Debug (l .logger ).Log (
348359 "msg" , "running instant query" ,
349360 "query" , query ,
@@ -352,7 +363,7 @@ func (l *promLogger) Query(ctx context.Context, query string, ts time.Time, opts
352363 return l .api .Query (ctx , query , ts , opts ... )
353364}
354365
355- func (l * promLogger ) QueryRange (ctx context.Context , query string , r prometheusv1 .Range , opts ... prometheusv1 .Option ) (model.Value , prometheusv1 .Warnings , error ) {
366+ func (l * promLogger ) QueryRange (ctx context.Context , query string , r prometheusapiv1 .Range , opts ... prometheusapiv1 .Option ) (model.Value , prometheusapiv1 .Warnings , error ) {
356367 level .Debug (l .logger ).Log (
357368 "msg" , "running range query" ,
358369 "query" , query ,
@@ -383,7 +394,7 @@ func contextGetPromCache(ctx context.Context) time.Duration {
383394 return 0
384395}
385396
386- func (p * promCache ) Query (ctx context.Context , query string , ts time.Time ) (model.Value , prometheusv1 .Warnings , error ) {
397+ func (p * promCache ) Query (ctx context.Context , query string , ts time.Time ) (model.Value , prometheusapiv1 .Warnings , error ) {
387398 if value , exists := p .cache .Get (query ); exists {
388399 return value .(model.Value ), nil , nil
389400 }
@@ -410,7 +421,7 @@ func (p *promCache) Query(ctx context.Context, query string, ts time.Time) (mode
410421 return value , warnings , nil
411422}
412423
413- func (p * promCache ) QueryRange (ctx context.Context , query string , r prometheusv1 .Range ) (model.Value , prometheusv1 .Warnings , error ) {
424+ func (p * promCache ) QueryRange (ctx context.Context , query string , r prometheusapiv1 .Range ) (model.Value , prometheusapiv1 .Warnings , error ) {
414425 // Get the full time range of this query from start to end.
415426 // We round by 10s to adjust for small imperfections to increase cache hits.
416427 timeRange := r .End .Sub (r .Start ).Round (10 * time .Second )
@@ -477,6 +488,61 @@ func (s *objectiveServer) List(ctx context.Context, req *connect.Request[objecti
477488 return nil , err
478489 }
479490
491+ groupingMatchers := map [string ]* labels.Matcher {}
492+ if req .Msg .Grouping != "" {
493+ ms , err := parser .ParseMetricSelector (req .Msg .Grouping )
494+ if err != nil {
495+ return nil , connect .NewError (connect .CodeInvalidArgument , err )
496+ }
497+ for _ , m := range ms {
498+ groupingMatchers [m .Name ] = m
499+ }
500+ }
501+
502+ for _ , o := range resp .Msg .Objectives {
503+ oi := objectivesv1alpha1 .ToInternal (o )
504+
505+ // If specific grouping was selected we need to merge the label matchers for the queries.
506+ if len (groupingMatchers ) > 0 {
507+ if oi .Indicator .Ratio != nil {
508+ for _ , m := range oi .Indicator .Ratio .Errors .LabelMatchers {
509+ if rm , replace := groupingMatchers [m .Name ]; replace {
510+ m .Type = rm .Type
511+ m .Value = rm .Value
512+ }
513+ }
514+ for _ , m := range oi .Indicator .Ratio .Total .LabelMatchers {
515+ if rm , replace := groupingMatchers [m .Name ]; replace {
516+ m .Type = rm .Type
517+ m .Value = rm .Value
518+ }
519+ }
520+ }
521+ if oi .Indicator .Latency != nil {
522+ for _ , m := range oi .Indicator .Latency .Success .LabelMatchers {
523+ if rm , replace := groupingMatchers [m .Name ]; replace {
524+ m .Type = rm .Type
525+ m .Value = rm .Value
526+ }
527+ }
528+ for _ , m := range oi .Indicator .Latency .Total .LabelMatchers {
529+ if rm , replace := groupingMatchers [m .Name ]; replace {
530+ m .Type = rm .Type
531+ m .Value = rm .Value
532+ }
533+ }
534+ }
535+ }
536+
537+ o .Queries = & objectivesv1alpha1.Queries {
538+ CountTotal : oi .QueryTotal (oi .Window ),
539+ CountErrors : oi .QueryErrors (oi .Window ),
540+ GraphErrorBudget : oi .QueryErrorBudget (),
541+ GraphRequests : oi .RequestRange (5 * time .Minute ),
542+ GraphErrors : oi .ErrorsRange (5 * time .Minute ),
543+ }
544+ }
545+
480546 return connect .NewResponse (& objectivesv1alpha1.ListResponse {
481547 Objectives : resp .Msg .Objectives ,
482548 }), nil
@@ -647,7 +713,7 @@ func (s *objectiveServer) GraphErrorBudget(ctx context.Context, req *connect.Req
647713 step := end .Sub (start ) / 1000
648714
649715 query := objective .QueryErrorBudget ()
650- value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , 15 * time .Second ), query , prometheusv1 .Range {
716+ value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , 15 * time .Second ), query , prometheusapiv1 .Range {
651717 Start : start ,
652718 End : end ,
653719 Step : step ,
@@ -1022,7 +1088,7 @@ func (s *objectiveServer) GraphRate(ctx context.Context, req *connect.Request[ob
10221088
10231089 query := objective .RequestRange (timeRange )
10241090
1025- value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusv1 .Range {
1091+ value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusapiv1 .Range {
10261092 Start : start ,
10271093 End : end ,
10281094 Step : step ,
@@ -1117,7 +1183,7 @@ func (s *objectiveServer) GraphErrors(ctx context.Context, req *connect.Request[
11171183 cacheDuration := rangeCache (start , end )
11181184
11191185 query := objective .ErrorsRange (timeRange )
1120- value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusv1 .Range {
1186+ value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusapiv1 .Range {
11211187 Start : start ,
11221188 End : end ,
11231189 Step : step ,
@@ -1235,7 +1301,7 @@ func (s *objectiveServer) GraphDuration(ctx context.Context, req *connect.Reques
12351301 for _ , percentile := range objectivePercentiles {
12361302 if objective .Target >= percentile {
12371303 query := objective .DurationRange (timeRange , percentile )
1238- value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusv1 .Range {
1304+ value , _ , err := s .promAPI .QueryRange (contextSetPromCache (ctx , cacheDuration ), query , prometheusapiv1 .Range {
12391305 Start : start ,
12401306 End : end ,
12411307 Step : step ,
0 commit comments