Skip to content

Commit

Permalink
Add MetricsQueryService grcp handler (#3091)
Browse files Browse the repository at this point in the history
* Add grcp handler

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Address review comments

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Check nil request

Signed-off-by: albertteoh <albert.teoh@logz.io>
  • Loading branch information
albertteoh authored Jun 16, 2021
1 parent c9d6957 commit e15c981
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 43 deletions.
32 changes: 32 additions & 0 deletions cmd/query/app/default_params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Contains default parameter values used by handlers when optional request parameters are missing.

package app

import (
"time"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

var (
defaultDependencyLookbackDuration = time.Hour * 24
defaultTraceQueryLookbackDuration = time.Hour * 24 * 2
defaultMetricsQueryLookbackDuration = time.Hour
defaultMetricsQueryStepDuration = 5 * time.Second
defaultMetricsQueryRateDuration = 10 * time.Minute
defaultMetricsSpanKinds = []string{metrics.SpanKind_SPAN_KIND_SERVER.String()}
)
165 changes: 150 additions & 15 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package app

import (
"context"
"errors"
"time"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
Expand All @@ -24,8 +26,11 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" //force gogo codec registration
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -35,22 +40,20 @@ const (
msgTraceNotFound = "trace not found"
)

var (
errGRPCMetricsQueryDisabled = status.Error(codes.Unimplemented, "metrics querying is currently disabled")
errNilRequest = status.Error(codes.InvalidArgument, "a nil argument is not allowed")
errMissingServiceNames = status.Error(codes.InvalidArgument, "please provide at least one service name")
errMissingQuantile = status.Error(codes.InvalidArgument, "please provide a quantile between (0, 1]")
)

// GRPCHandler implements the gRPC endpoint of the query service.
type GRPCHandler struct {
queryService *querysvc.QueryService
logger *zap.Logger
tracer opentracing.Tracer
}

// NewGRPCHandler returns a GRPCHandler
func NewGRPCHandler(queryService *querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
gH := &GRPCHandler{
queryService: queryService,
logger: logger,
tracer: tracer,
}

return gH
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
logger *zap.Logger
tracer opentracing.Tracer
nowFn func() time.Time
}

// GetTrace is the gRPC handler to fetch traces based on trace-id.
Expand Down Expand Up @@ -177,3 +180,135 @@ func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependen

return &api_v2.GetDependenciesResponse{Dependencies: dependencies}, nil
}

// GetLatencies is the gRPC handler to fetch latency metrics.
func (g *GRPCHandler) GetLatencies(ctx context.Context, r *metrics.GetLatenciesRequest) (*metrics.GetMetricsResponse, error) {
bqp, err := g.newBaseQueryParameters(r)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
// Check for cases where clients do not provide the Quantile, which defaults to the float64's zero value.
if r.Quantile == 0 {
return nil, errMissingQuantile
}
queryParams := metricsstore.LatenciesQueryParameters{
BaseQueryParameters: bqp,
Quantile: r.Quantile,
}
m, err := g.metricsQueryService.GetLatencies(ctx, &queryParams)
if err := g.handleErr("failed to fetch latencies", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetCallRates is the gRPC handler to fetch call rate metrics.
func (g *GRPCHandler) GetCallRates(ctx context.Context, r *metrics.GetCallRatesRequest) (*metrics.GetMetricsResponse, error) {
bqp, err := g.newBaseQueryParameters(r)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
queryParams := metricsstore.CallRateQueryParameters{
BaseQueryParameters: bqp,
}
m, err := g.metricsQueryService.GetCallRates(ctx, &queryParams)
if err := g.handleErr("failed to fetch call rates", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetErrorRates is the gRPC handler to fetch error rate metrics.
func (g *GRPCHandler) GetErrorRates(ctx context.Context, r *metrics.GetErrorRatesRequest) (*metrics.GetMetricsResponse, error) {
bqp, err := g.newBaseQueryParameters(r)
if err := g.handleErr("failed to build parameters", err); err != nil {
return nil, err
}
queryParams := metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: bqp,
}
m, err := g.metricsQueryService.GetErrorRates(ctx, &queryParams)
if err := g.handleErr("failed to fetch error rates", err); err != nil {
return nil, err
}
return &metrics.GetMetricsResponse{Metrics: *m}, nil
}

// GetMinStepDuration is the gRPC handler to fetch the minimum step duration supported by the underlying metrics store.
func (g *GRPCHandler) GetMinStepDuration(ctx context.Context, _ *metrics.GetMinStepDurationRequest) (*metrics.GetMinStepDurationResponse, error) {
minStep, err := g.metricsQueryService.GetMinStepDuration(ctx, &metricsstore.MinStepDurationQueryParameters{})
if err := g.handleErr("failed to fetch min step duration", err); err != nil {
return nil, err
}
return &metrics.GetMinStepDurationResponse{MinStep: minStep}, nil
}

func (g *GRPCHandler) handleErr(msg string, err error) error {
if err == nil {
return nil
}
g.logger.Error(msg, zap.Error(err))

// Avoid wrapping "expected" errors with an "Internal Server" error.
if errors.Is(err, disabled.ErrDisabled) {
return errGRPCMetricsQueryDisabled
}
if _, ok := status.FromError(err); ok {
return err
}

// Received an "unexpected" error.
return status.Errorf(codes.Internal, "%s: %v", msg, err)
}

func (g *GRPCHandler) newBaseQueryParameters(r interface{}) (bqp metricsstore.BaseQueryParameters, err error) {
if r == nil {
return bqp, errNilRequest
}
var baseRequest *metrics.MetricsQueryBaseRequest
switch v := r.(type) {
case *metrics.GetLatenciesRequest:
baseRequest = v.BaseRequest
case *metrics.GetCallRatesRequest:
baseRequest = v.BaseRequest
case *metrics.GetErrorRatesRequest:
baseRequest = v.BaseRequest
}
if baseRequest == nil || len(baseRequest.ServiceNames) == 0 {
return bqp, errMissingServiceNames
}

// Copy non-nullable params.
bqp.GroupByOperation = baseRequest.GroupByOperation
bqp.ServiceNames = baseRequest.ServiceNames

// Initialize nullable params with defaults.
defaultEndTime := g.nowFn()
bqp.EndTime = &defaultEndTime
bqp.Lookback = &defaultMetricsQueryLookbackDuration
bqp.RatePer = &defaultMetricsQueryRateDuration
bqp.SpanKinds = defaultMetricsSpanKinds
bqp.Step = &defaultMetricsQueryStepDuration

// ... and override defaults with any provided request params.
if baseRequest.EndTime != nil {
bqp.EndTime = baseRequest.EndTime
}
if baseRequest.Lookback != nil {
bqp.Lookback = baseRequest.Lookback
}
if baseRequest.Step != nil {
bqp.Step = baseRequest.Step
}
if baseRequest.RatePer != nil {
bqp.RatePer = baseRequest.RatePer
}
if len(baseRequest.SpanKinds) > 0 {
spanKinds := make([]string, len(baseRequest.SpanKinds))
for i, v := range baseRequest.SpanKinds {
spanKinds[i] = v.String()
}
bqp.SpanKinds = spanKinds
}
return bqp, nil
}
Loading

0 comments on commit e15c981

Please sign in to comment.