Skip to content

Commit

Permalink
Generalise some of Mimir's query sharding code to be more reusable
Browse files Browse the repository at this point in the history
  • Loading branch information
zenador committed Sep 22, 2024
1 parent 73b584c commit 242223e
Show file tree
Hide file tree
Showing 19 changed files with 568 additions and 80 deletions.
229 changes: 217 additions & 12 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ var (
allFormats = []string{formatJSON, formatProtobuf}

// List of HTTP headers to propagate when a Prometheus request is encoded into a HTTP request.
prometheusCodecPropagateHeaders = []string{compat.ForceFallbackHeaderName, chunkinfologger.ChunkInfoLoggingHeader, api.ReadConsistencyOffsetsHeader}
clusterNameHeader = "X-Cluster-Name"
prometheusCodecPropagateHeaders = []string{compat.ForceFallbackHeaderName, chunkinfologger.ChunkInfoLoggingHeader, api.ReadConsistencyOffsetsHeader, clusterNameHeader}
prometheusCodecLabelsPropagateHeaders = []string{clusterNameHeader}
)

const (
Expand Down Expand Up @@ -81,12 +83,18 @@ type Codec interface {
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error)
// DecodeLabelsResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeLabelsResponse(context.Context, *http.Response, LabelsQueryRequest, log.Logger) (Response, error)
// EncodeMetricsQueryRequest encodes a MetricsQueryRequest into an http request.
EncodeMetricsQueryRequest(context.Context, MetricsQueryRequest) (*http.Request, error)
// EncodeLabelsQueryRequest encodes a LabelsQueryRequest into an http request.
EncodeLabelsQueryRequest(context.Context, LabelsQueryRequest) (*http.Request, error)
// EncodeResponse encodes a Response into an http response.
EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error)
// EncodeLabelsResponse encodes a Response into an http response.
EncodeLabelsResponse(context.Context, *http.Request, Response, LabelsQueryRequest) (*http.Response, error)
}

// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
Expand Down Expand Up @@ -166,6 +174,14 @@ type LabelsQueryRequest interface {
GetLabelMatcherSets() []string
// GetLimit returns the limit of the number of items in the response.
GetLimit() uint64
// GetHeaders returns the HTTP headers in the request.
GetHeaders() []*PrometheusHeader
// WithLabelName clones the current request with a different label name param.
WithLabelName(string) (LabelsQueryRequest, error)
// WithLabelMatcherSets clones the current request with different label matchers.
WithLabelMatcherSets([]string) (LabelsQueryRequest, error)
// WithHeaders clones the current request with different headers.
WithHeaders([]*PrometheusHeader) (LabelsQueryRequest, error)
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
}
Expand Down Expand Up @@ -211,7 +227,11 @@ type prometheusCodec struct {

type formatter interface {
EncodeResponse(resp *PrometheusResponse) ([]byte, error)
EncodeLabelsResponse(resp *PrometheusLabelsResponse) ([]byte, error)
EncodeSeriesResponse(resp *PrometheusSeriesResponse) ([]byte, error)
DecodeResponse([]byte) (*PrometheusResponse, error)
DecodeLabelsResponse([]byte) (*PrometheusLabelsResponse, error)
DecodeSeriesResponse([]byte) (*PrometheusSeriesResponse, error)
Name() string
ContentType() v1.MIMEType
}
Expand Down Expand Up @@ -319,7 +339,7 @@ func (c prometheusCodec) decodeRangeQueryRequest(r *http.Request) (MetricsQueryR
query := reqValues.Get("query")
queryExpr, err := parser.ParseExpr(query)
if err != nil {
return nil, decorateWithParamName(err, "query")
return nil, DecorateWithParamName(err, "query")
}

var options Options
Expand All @@ -345,13 +365,13 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer

time, err := DecodeInstantQueryTimeParams(&reqValues, time.Now)
if err != nil {
return nil, decorateWithParamName(err, "time")
return nil, DecorateWithParamName(err, "time")
}

query := reqValues.Get("query")
queryExpr, err := parser.ParseExpr(query)
if err != nil {
return nil, decorateWithParamName(err, "query")
return nil, DecorateWithParamName(err, "query")
}

var options Options
Expand All @@ -364,7 +384,7 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer
}

func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Request) (LabelsQueryRequest, error) {
if !IsLabelsQuery(r.URL.Path) {
if !IsLabelsQuery(r.URL.Path) && !IsSeriesQuery(r.URL.Path) {
return nil, fmt.Errorf("unknown labels query API endpoint %s", r.URL.Path)
}

Expand All @@ -387,6 +407,15 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque
}
}

if IsSeriesQuery(r.URL.Path) {
return &PrometheusSeriesQueryRequest{
Path: r.URL.Path,
Start: start,
End: end,
LabelMatcherSets: labelMatcherSets,
Limit: limit,
}, nil
}
if IsLabelNamesQuery(r.URL.Path) {
return &PrometheusLabelNamesQueryRequest{
Path: r.URL.Path,
Expand All @@ -412,12 +441,12 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque
func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, err error) {
start, err = util.ParseTime(reqValues.Get("start"))
if err != nil {
return 0, 0, 0, decorateWithParamName(err, "start")
return 0, 0, 0, DecorateWithParamName(err, "start")
}

end, err = util.ParseTime(reqValues.Get("end"))
if err != nil {
return 0, 0, 0, decorateWithParamName(err, "end")
return 0, 0, 0, DecorateWithParamName(err, "end")
}

if end < start {
Expand All @@ -426,7 +455,7 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64,

step, err = parseDurationMs(reqValues.Get("step"))
if err != nil {
return 0, 0, 0, decorateWithParamName(err, "step")
return 0, 0, 0, DecorateWithParamName(err, "step")
}

if step <= 0 {
Expand All @@ -451,7 +480,7 @@ func DecodeInstantQueryTimeParams(reqValues *url.Values, defaultNow func() time.
} else {
time, err = util.ParseTime(timeVal)
if err != nil {
return 0, decorateWithParamName(err, "time")
return 0, DecorateWithParamName(err, "time")
}
}

Expand All @@ -476,7 +505,7 @@ func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (s
} else {
start, err = util.ParseTime(startVal)
if err != nil {
return 0, 0, decorateWithParamName(err, "start")
return 0, 0, DecorateWithParamName(err, "start")
}
}

Expand All @@ -486,7 +515,7 @@ func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (s
} else {
end, err = util.ParseTime(endVal)
if err != nil {
return 0, 0, decorateWithParamName(err, "end")
return 0, 0, DecorateWithParamName(err, "end")
}
}

Expand Down Expand Up @@ -652,6 +681,24 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label
Path: req.Path, // path still contains label name
RawQuery: urlValues.Encode(),
}
case *PrometheusSeriesQueryRequest:
urlValues := url.Values{}
if req.GetStart() != 0 {
urlValues["start"] = []string{encodeTime(req.Start)}
}
if req.GetEnd() != 0 {
urlValues["end"] = []string{encodeTime(req.End)}
}
if len(req.GetLabelMatcherSets()) > 0 {
urlValues["match[]"] = req.GetLabelMatcherSets()
}
if req.GetLimit() > 0 {
urlValues["limit"] = []string{strconv.FormatUint(req.GetLimit(), 10)}
}
u = &url.URL{
Path: req.Path,
RawQuery: urlValues.Encode(),
}

default:
return nil, fmt.Errorf("unsupported request type %T", req)
Expand All @@ -678,6 +725,18 @@ func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req Label
r.Header.Add(api.ReadConsistencyHeader, level)
}

// Propagate allowed HTTP headers.
for _, h := range req.GetHeaders() {
if !slices.Contains(prometheusCodecLabelsPropagateHeaders, h.Name) {
continue
}

for _, v := range h.Values {
// There should only be one value, but add all of them for completeness.
r.Header.Add(h.Name, v)
}
}

return r.WithContext(ctx), nil
}

Expand Down Expand Up @@ -755,6 +814,90 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _
return resp, nil
}

func (c prometheusCodec) DecodeLabelsResponse(ctx context.Context, r *http.Response, lr LabelsQueryRequest, logger log.Logger) (Response, error) {
spanlog := spanlogger.FromContext(ctx, logger)
buf, err := readResponseBody(r)
if err != nil {
return nil, spanlog.Error(err)
}

spanlog.LogFields(otlog.String("message", "ParseQueryRangeResponse"),
otlog.Int("status_code", r.StatusCode),
otlog.Int("bytes", len(buf)))

// Before attempting to decode a response based on the content type, check if the
// Content-Type header was even set. When the scheduler returns gRPC errors, they
// are encoded as httpgrpc.HTTPResponse objects with an HTTP status code and the
// error message as the body of the response with no content type. We need to handle
// that case here before we decode well-formed success or error responses.
contentType := r.Header.Get("Content-Type")
if contentType == "" {
switch r.StatusCode {
case http.StatusServiceUnavailable:
return nil, apierror.New(apierror.TypeUnavailable, string(buf))
case http.StatusTooManyRequests:
return nil, apierror.New(apierror.TypeTooManyRequests, string(buf))
case http.StatusRequestEntityTooLarge:
return nil, apierror.New(apierror.TypeTooLargeEntry, string(buf))
default:
if r.StatusCode/100 == 5 {
return nil, apierror.New(apierror.TypeInternal, string(buf))
}
}
}

formatter := findFormatter(contentType)
if formatter == nil {
return nil, apierror.Newf(apierror.TypeInternal, "unknown response content type '%v'", contentType)
}

start := time.Now()

var response Response

switch lr.(type) {
case *PrometheusLabelNamesQueryRequest, *PrometheusLabelValuesQueryRequest:
resp, err := formatter.DecodeLabelsResponse(buf)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err)
}

c.metrics.duration.WithLabelValues(operationDecode, formatter.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationDecode, formatter.Name()).Observe(float64(len(buf)))

if resp.Status == statusError {
return nil, apierror.New(apierror.Type(resp.ErrorType), resp.Error)
}

for h, hv := range r.Header {
resp.Headers = append(resp.Headers, &PrometheusHeader{Name: h, Values: hv})
}

response = resp
case *PrometheusSeriesQueryRequest:
resp, err := formatter.DecodeSeriesResponse(buf)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error decoding response: %v", err)
}

c.metrics.duration.WithLabelValues(operationDecode, formatter.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationDecode, formatter.Name()).Observe(float64(len(buf)))

if resp.Status == statusError {
return nil, apierror.New(apierror.Type(resp.ErrorType), resp.Error)
}

for h, hv := range r.Header {
resp.Headers = append(resp.Headers, &PrometheusHeader{Name: h, Values: hv})
}

response = resp
default:
return nil, apierror.Newf(apierror.TypeInternal, "unsupported request type %T", lr)
}
return response, nil
}

func findFormatter(contentType string) formatter {
for _, f := range knownFormats {
if f.ContentType().String() == contentType {
Expand Down Expand Up @@ -807,6 +950,68 @@ func (c prometheusCodec) EncodeResponse(ctx context.Context, req *http.Request,
return &resp, nil
}

func (c prometheusCodec) EncodeLabelsResponse(ctx context.Context, req *http.Request, res Response, lr LabelsQueryRequest) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

selectedContentType, formatter := c.negotiateContentType(req.Header.Get("Accept"))
if formatter == nil {
return nil, apierror.New(apierror.TypeNotAcceptable, "none of the content types in the Accept header are supported")
}

var start time.Time
var b []byte

switch lr.(type) {
case *PrometheusLabelNamesQueryRequest, *PrometheusLabelValuesQueryRequest:
a, ok := res.(*PrometheusLabelsResponse)
if !ok {
return nil, apierror.Newf(apierror.TypeInternal, "invalid response format")
}
if a.Data != nil {
sp.LogFields(otlog.Int("labels", len(a.Data)))
}

start = time.Now()
var err error
b, err = formatter.EncodeLabelsResponse(a)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err)
}
case *PrometheusSeriesQueryRequest:
a, ok := res.(*PrometheusSeriesResponse)
if !ok {
return nil, apierror.Newf(apierror.TypeInternal, "invalid response format")
}
if a.Data != nil {
sp.LogFields(otlog.Int("labels", len(a.Data)))
}

start = time.Now()
var err error
b, err = formatter.EncodeSeriesResponse(a)
if err != nil {
return nil, apierror.Newf(apierror.TypeInternal, "error encoding response: %v", err)
}
default:
return nil, apierror.Newf(apierror.TypeInternal, "unsupported request type %T", lr)
}

c.metrics.duration.WithLabelValues(operationEncode, formatter.Name()).Observe(time.Since(start).Seconds())
c.metrics.size.WithLabelValues(operationEncode, formatter.Name()).Observe(float64(len(b)))
sp.LogFields(otlog.Int("bytes", len(b)))

resp := http.Response{
Header: http.Header{
"Content-Type": []string{selectedContentType},
},
Body: io.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}
return &resp, nil
}

func (prometheusCodec) negotiateContentType(acceptHeader string) (string, formatter) {
if acceptHeader == "" {
return jsonMimeType, jsonFormatterInstance
Expand Down Expand Up @@ -967,7 +1172,7 @@ func encodeDurationMs(d int64) string {
return strconv.FormatFloat(float64(d)/float64(time.Second/time.Millisecond), 'f', -1, 64)
}

func decorateWithParamName(err error, field string) error {
func DecorateWithParamName(err error, field string) error {
errTmpl := "invalid parameter %q: %v"
if status, ok := grpcutil.ErrorToStatus(err); ok {
return apierror.Newf(apierror.TypeBadData, errTmpl, field, status.Message())
Expand Down
Loading

0 comments on commit 242223e

Please sign in to comment.