Skip to content

Commit b2820ce

Browse files
committed
Add dedicated instant/range query handlers
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 7046357 commit b2820ce

File tree

6 files changed

+674
-24
lines changed

6 files changed

+674
-24
lines changed

pkg/api/handlers.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/cortexproject/cortex/pkg/querier"
2929
"github.com/cortexproject/cortex/pkg/querier/codec"
30+
"github.com/cortexproject/cortex/pkg/querier/queryapi"
3031
"github.com/cortexproject/cortex/pkg/querier/stats"
3132
"github.com/cortexproject/cortex/pkg/util"
3233
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -195,6 +196,8 @@ func NewQuerierHandler(
195196
Help: "Current number of inflight requests to the querier.",
196197
}, []string{"method", "route"})
197198

199+
statsRenderer := querier.StatsRenderer
200+
corsOrigin := regexp.MustCompile(".*")
198201
api := v1.NewAPI(
199202
engine,
200203
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
@@ -214,7 +217,7 @@ func NewQuerierHandler(
214217
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
215218
0, 0, 0, // Remote read samples and concurrency limit.
216219
false,
217-
regexp.MustCompile(".*"),
220+
corsOrigin,
218221
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
219222
&v1.PrometheusVersion{
220223
Version: version.Version,
@@ -229,7 +232,7 @@ func NewQuerierHandler(
229232
// This is used for the stats API which we should not support. Or find other ways to.
230233
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
231234
reg,
232-
querier.StatsRenderer,
235+
statsRenderer,
233236
false,
234237
nil,
235238
false,
@@ -240,11 +243,18 @@ func NewQuerierHandler(
240243
api.ClearCodecs()
241244
cm := codec.NewInstrumentedCodecMetrics(reg)
242245

243-
api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
244-
// Install Protobuf codec to give the option for using either.
245-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
246-
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
247-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
246+
codecs := []v1.Codec{
247+
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
248+
// Protobuf codec to give the option for using either.
249+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
250+
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
251+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
252+
}
253+
254+
// Install codecs
255+
for _, c := range codecs {
256+
api.InstallCodec(c)
257+
}
248258

249259
router := mux.NewRouter()
250260

@@ -269,13 +279,15 @@ func NewQuerierHandler(
269279
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
270280
api.Register(legacyPromRouter)
271281

282+
c := queryapi.NewQueryAPI(engine, queryable, statsRenderer, logger, codecs, corsOrigin)
283+
272284
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
273285
// https://github.com/prometheus/prometheus/pull/7125/files
274286
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
275287
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
276288
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
277-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
278-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
289+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
290+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
279291
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
280292
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
281293
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
@@ -287,8 +299,8 @@ func NewQuerierHandler(
287299
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
288300
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
289301
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
290-
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
291-
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
302+
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
303+
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
292304
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
293305
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
294306
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)

pkg/querier/queryapi/query_api.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package queryapi
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/grafana/regexp"
12+
"github.com/munnerz/goautoneg"
13+
"github.com/prometheus/prometheus/promql"
14+
"github.com/prometheus/prometheus/storage"
15+
"github.com/prometheus/prometheus/util/annotations"
16+
"github.com/prometheus/prometheus/util/httputil"
17+
v1 "github.com/prometheus/prometheus/web/api/v1"
18+
"github.com/weaveworks/common/httpgrpc"
19+
20+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
21+
"github.com/cortexproject/cortex/pkg/util"
22+
"github.com/cortexproject/cortex/pkg/util/api"
23+
)
24+
25+
type QueryAPI struct {
26+
queryable storage.SampleAndChunkQueryable
27+
queryEngine promql.QueryEngine
28+
now func() time.Time
29+
statsRenderer v1.StatsRenderer
30+
logger log.Logger
31+
codecs []v1.Codec
32+
CORSOrigin *regexp.Regexp
33+
}
34+
35+
func NewQueryAPI(
36+
qe promql.QueryEngine,
37+
q storage.SampleAndChunkQueryable,
38+
statsRenderer v1.StatsRenderer,
39+
logger log.Logger,
40+
codecs []v1.Codec,
41+
CORSOrigin *regexp.Regexp,
42+
) *QueryAPI {
43+
return &QueryAPI{
44+
queryable: q,
45+
queryEngine: qe,
46+
now: time.Now,
47+
statsRenderer: statsRenderer,
48+
logger: logger,
49+
codecs: codecs,
50+
CORSOrigin: CORSOrigin,
51+
}
52+
}
53+
54+
// Custom handler for Query range API
55+
func (c *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
56+
start, err := util.ParseTime(r.FormValue("start"))
57+
if err != nil {
58+
return invalidParamError(err, "start")
59+
}
60+
end, err := util.ParseTime(r.FormValue("end"))
61+
if err != nil {
62+
return invalidParamError(err, "end")
63+
}
64+
if end < start {
65+
return invalidParamError(queryrange.ErrEndBeforeStart, "end")
66+
}
67+
68+
step, err := util.ParseDurationMs(r.FormValue("step"))
69+
if err != nil {
70+
return invalidParamError(err, "step")
71+
}
72+
73+
if step <= 0 {
74+
return invalidParamError(queryrange.ErrNegativeStep, "step")
75+
}
76+
77+
// For safety, limit the number of returned points per timeseries.
78+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
79+
if (end-start)/step > 11000 {
80+
return apiFuncResult{nil, &apiError{errorBadData, queryrange.ErrStepTooSmall}, nil, nil}
81+
}
82+
83+
ctx := r.Context()
84+
if to := r.FormValue("timeout"); to != "" {
85+
var cancel context.CancelFunc
86+
timeout, err := util.ParseDurationMs(to)
87+
if err != nil {
88+
return invalidParamError(err, "timeout")
89+
}
90+
91+
ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout))
92+
defer cancel()
93+
}
94+
95+
opts, err := extractQueryOpts(r)
96+
if err != nil {
97+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
98+
}
99+
qry, err := c.queryEngine.NewRangeQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
100+
if err != nil {
101+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
102+
}
103+
// From now on, we must only return with a finalizer in the result (to
104+
// be called by the caller) or call qry.Close ourselves (which is
105+
// required in the case of a panic).
106+
defer func() {
107+
if result.finalizer == nil {
108+
qry.Close()
109+
}
110+
}()
111+
112+
ctx = httputil.ContextFromRequest(ctx, r)
113+
114+
res := qry.Exec(ctx)
115+
if res.Err != nil {
116+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
117+
}
118+
119+
warnings := res.Warnings
120+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
121+
122+
return apiFuncResult{&v1.QueryData{
123+
ResultType: res.Value.Type(),
124+
Result: res.Value,
125+
Stats: qs,
126+
}, nil, warnings, qry.Close}
127+
}
128+
129+
// Custom handler for Query API
130+
func (c *QueryAPI) InstantHandler(r *http.Request) (result apiFuncResult) {
131+
ts, err := util.ParseTimeParam(r, "time", c.now().Unix())
132+
if err != nil {
133+
return invalidParamError(err, "time")
134+
}
135+
136+
ctx := r.Context()
137+
if to := r.FormValue("timeout"); to != "" {
138+
var cancel context.CancelFunc
139+
timeout, err := util.ParseDurationMs(to)
140+
if err != nil {
141+
return invalidParamError(err, "timeout")
142+
}
143+
144+
ctx, cancel = context.WithDeadline(ctx, c.now().Add(convertMsToDuration(timeout)))
145+
defer cancel()
146+
}
147+
148+
opts, err := extractQueryOpts(r)
149+
if err != nil {
150+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
151+
}
152+
qry, err := c.queryEngine.NewInstantQuery(ctx, c.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
153+
if err != nil {
154+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
155+
}
156+
157+
// From now on, we must only return with a finalizer in the result (to
158+
// be called by the caller) or call qry.Close ourselves (which is
159+
// required in the case of a panic).
160+
defer func() {
161+
if result.finalizer == nil {
162+
qry.Close()
163+
}
164+
}()
165+
166+
ctx = httputil.ContextFromRequest(ctx, r)
167+
168+
res := qry.Exec(ctx)
169+
if res.Err != nil {
170+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
171+
}
172+
173+
warnings := res.Warnings
174+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
175+
176+
return apiFuncResult{&v1.QueryData{
177+
ResultType: res.Value.Type(),
178+
Result: res.Value,
179+
Stats: qs,
180+
}, nil, warnings, qry.Close}
181+
}
182+
183+
func (c *QueryAPI) Wrap(f apiFunc) http.HandlerFunc {
184+
return func(w http.ResponseWriter, r *http.Request) {
185+
httputil.SetCORS(w, c.CORSOrigin, r)
186+
187+
result := f(r)
188+
if result.finalizer != nil {
189+
defer result.finalizer()
190+
}
191+
192+
if result.err != nil {
193+
api.RespondFromGRPCError(c.logger, w, result.err.err)
194+
return
195+
}
196+
197+
if result.data != nil {
198+
c.respond(w, r, result.data, result.warnings, r.FormValue("query"))
199+
return
200+
}
201+
w.WriteHeader(http.StatusNoContent)
202+
}
203+
}
204+
205+
func (c *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
206+
warn, info := warnings.AsStrings(query, 10, 10)
207+
208+
resp := &v1.Response{
209+
Status: statusSuccess,
210+
Data: data,
211+
Warnings: warn,
212+
Infos: info,
213+
}
214+
215+
codec, err := c.negotiateCodec(req, resp)
216+
if err != nil {
217+
api.RespondFromGRPCError(c.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err}))
218+
return
219+
}
220+
221+
b, err := codec.Encode(resp)
222+
if err != nil {
223+
level.Error(c.logger).Log("error marshaling response", "url", req.URL, "err", err)
224+
http.Error(w, err.Error(), http.StatusInternalServerError)
225+
return
226+
}
227+
228+
w.Header().Set("Content-Type", codec.ContentType().String())
229+
w.WriteHeader(http.StatusOK)
230+
if n, err := w.Write(b); err != nil {
231+
level.Error(c.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
232+
}
233+
}
234+
235+
func (c *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
236+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
237+
for _, codec := range c.codecs {
238+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
239+
return codec, nil
240+
}
241+
}
242+
}
243+
244+
defaultCodec := c.codecs[0]
245+
if !defaultCodec.CanEncode(resp) {
246+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
247+
}
248+
249+
return defaultCodec, nil
250+
}

0 commit comments

Comments
 (0)