diff --git a/cmd/promxy/main.go b/cmd/promxy/main.go index 5ee991a3f..abf93eb0c 100644 --- a/cmd/promxy/main.go +++ b/cmd/promxy/main.go @@ -406,6 +406,8 @@ func main() { } } else if r.URL.Path == path.Join(webOptions.RoutePrefix, "/api/v1/status/config") { ps.ConfigHandler(w, r) + } else if r.URL.Path == path.Join(webOptions.RoutePrefix, "/api/v1/metadata") { + ps.MetadataHandler(w, r) } else { // all else we send direct to the local prometheus UI webHandler.GetRouter().ServeHTTP(w, r) diff --git a/pkg/promclient/debug.go b/pkg/promclient/debug.go index 14828b2ca..c314ea7d2 100644 --- a/pkg/promclient/debug.go +++ b/pkg/promclient/debug.go @@ -164,3 +164,28 @@ func (d *DebugAPI) GetValue(ctx context.Context, start, end time.Time, matchers return v, w, err } + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (d *DebugAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + fields := logrus.Fields{ + "api": "Metadata", + "metric": metric, + "limit": limit, + } + + logrus.WithFields(fields).Debug(d.PrefixMessage) + + s := time.Now() + v, err := d.A.Metadata(ctx, metric, limit) + fields["took"] = time.Since(s) + + if logrus.GetLevel() > logrus.DebugLevel { + fields["value"] = v + fields["error"] = err + logrus.WithFields(fields).Trace(d.PrefixMessage) + } else { + logrus.WithFields(fields).Debug(d.PrefixMessage) + } + + return v, err +} diff --git a/pkg/promclient/ignore_error.go b/pkg/promclient/ignore_error.go index 57cb1969d..cb62b15ae 100644 --- a/pkg/promclient/ignore_error.go +++ b/pkg/promclient/ignore_error.go @@ -64,3 +64,10 @@ func (n *IgnoreErrorAPI) Key() model.LabelSet { } return nil } + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (n *IgnoreErrorAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + // TODO: query all and merge + v, _ := n.A.Metadata(ctx, metric, limit) + return v, nil +} diff --git a/pkg/promclient/interface.go b/pkg/promclient/interface.go index 1f5887604..5f672dfa9 100644 --- a/pkg/promclient/interface.go +++ b/pkg/promclient/interface.go @@ -23,6 +23,8 @@ type API interface { Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) // GetValue loads the raw data for a given set of matchers in the time range GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, v1.Warnings, error) + // Metadata returns metadata about metrics currently scraped by the metric name. + Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) } // APILabels includes a Key() mechanism to differentiate which APIs are "the same" diff --git a/pkg/promclient/multi_api.go b/pkg/promclient/multi_api.go index 1c5858af8..77b4ab11f 100644 --- a/pkg/promclient/multi_api.go +++ b/pkg/promclient/multi_api.go @@ -581,3 +581,80 @@ func (m *MultiAPI) GetValue(ctx context.Context, start, end time.Time, matchers return result, warnings.Warnings(), nil } + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (m *MultiAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + childContext, childContextCancel := context.WithCancel(ctx) + defer childContextCancel() + + type chanResult struct { + v map[string][]v1.Metadata + err error + ls model.Fingerprint + } + + resultChans := make([]chan chanResult, len(m.apis)) + outstandingRequests := make(map[model.Fingerprint]int) // fingerprint -> outstanding + + for i, api := range m.apis { + resultChans[i] = make(chan chanResult, 1) + outstandingRequests[m.apiFingerprints[i]]++ + go func(i int, retChan chan chanResult, api API, metric, limit string) { + start := time.Now() + result, err := api.Metadata(childContext, metric, limit) + took := time.Since(start) + if err != nil { + m.recordMetric(i, "query", "error", took.Seconds()) + } else { + m.recordMetric(i, "query", "success", took.Seconds()) + } + retChan <- chanResult{ + v: result, + err: NormalizePromError(err), + ls: m.apiFingerprints[i], + } + }(i, resultChans[i], api, metric, limit) + } + + // Wait for results as we get them + var result map[string][]v1.Metadata + var lastError error + successMap := make(map[model.Fingerprint]int) // fingerprint -> success + for i := 0; i < len(m.apis); i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case ret := <-resultChans[i]: + outstandingRequests[ret.ls]-- + if ret.err != nil { + // If there aren't enough outstanding requests to possibly succeed, no reason to wait + if (outstandingRequests[ret.ls] + successMap[ret.ls]) < m.requiredCount { + return nil, ret.err + } + lastError = ret.err + } else { + successMap[ret.ls]++ + if result == nil { + result = ret.v + } else { + // Merge metadata! + for k, v := range ret.v { + if _, ok := result[k]; !ok { + result[k] = v + } + } + } + } + } + } + + // Verify that we hit the requiredCount for all of the buckets + for k := range outstandingRequests { + if successMap[k] < m.requiredCount { + return nil, errors.Wrap(lastError, "Unable to fetch from downstream servers") + } + } + + return result, nil +} diff --git a/pkg/promclient/multi_api_test.go b/pkg/promclient/multi_api_test.go index efc4138b4..9b611f460 100644 --- a/pkg/promclient/multi_api_test.go +++ b/pkg/promclient/multi_api_test.go @@ -19,6 +19,7 @@ type stubAPI struct { queryRange func() model.Value series func() []model.LabelSet getValue func() model.Value + metadata func() map[string][]v1.Metadata } // LabelNames returns all the unique label names present in the block in sorted order. @@ -51,6 +52,11 @@ func (s *stubAPI) GetValue(ctx context.Context, start, end time.Time, matchers [ return s.getValue(), nil, nil } +// Metadata returns metadata about metrics currently scraped by the metric name. +func (s *stubAPI) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + return s.metadata(), nil +} + type errorAPI struct { API err error diff --git a/pkg/promclient/recover.go b/pkg/promclient/recover.go index cdbdbc6ad..d5d29b8bf 100644 --- a/pkg/promclient/recover.go +++ b/pkg/promclient/recover.go @@ -72,3 +72,13 @@ func (api *recoverAPI) GetValue(ctx context.Context, start, end time.Time, match }() return api.A.GetValue(ctx, start, end, matchers) } + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (api *recoverAPI) Metadata(ctx context.Context, metric, limit string) (v map[string][]v1.Metadata, err error) { + defer func() { + if r := recover(); r != nil { + err = r.(error) + } + }() + return api.A.Metadata(ctx, metric, limit) +} diff --git a/pkg/proxystorage/proxy.go b/pkg/proxystorage/proxy.go index 6526eceea..be89aa1ed 100644 --- a/pkg/proxystorage/proxy.go +++ b/pkg/proxystorage/proxy.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "reflect" + "strconv" "sync/atomic" "time" @@ -171,6 +172,62 @@ func (p *ProxyStorage) ConfigHandler(w http.ResponseWriter, r *http.Request) { return } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if n, err := w.Write(b); err != nil { + logrus.Error("msg", "error writing response", "bytesWritten", n, "err", err) + } +} + +// MetadataHandler is an implementation of the metadata handler within the prometheus API +func (p *ProxyStorage) MetadataHandler(w http.ResponseWriter, r *http.Request) { + // Check that "limit" is valid + var limit int + if s := r.FormValue("limit"); s != "" { + var err error + if limit, err = strconv.Atoi(s); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Do the metadata lookup + state := p.GetState() + metadata, err := state.client.Metadata(r.Context(), r.FormValue("metric"), r.FormValue("limit")) + + // Trim the results to the requested limit + if len(metadata) > limit { + count := 0 + for k := range metadata { + if count < limit { + count++ + } else { + delete(metadata, k) + } + } + } + + var v map[string]interface{} + if err != nil { + v = map[string]interface{}{ + "status": "error", + "error": err.Error(), + } + } else { + v = map[string]interface{}{ + "status": "success", + "data": metadata, + } + } + + json := jsoniter.ConfigCompatibleWithStandardLibrary + b, err := json.Marshal(v) + if err != nil { + logrus.Error("msg", "error marshaling json response", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) if n, err := w.Write(b); err != nil { diff --git a/pkg/servergroup/servergroup.go b/pkg/servergroup/servergroup.go index 5dd984e99..599298396 100644 --- a/pkg/servergroup/servergroup.go +++ b/pkg/servergroup/servergroup.go @@ -337,3 +337,9 @@ func (s *ServerGroup) LabelNames(ctx context.Context, matchers []string, startTi func (s *ServerGroup) Series(ctx context.Context, matches []string, startTime, endTime time.Time) ([]model.LabelSet, v1.Warnings, error) { return s.State().apiClient.Series(ctx, matches, startTime, endTime) } + +// Metadata returns metadata about metrics currently scraped by the metric name. +func (s *ServerGroup) Metadata(ctx context.Context, metric, limit string) (map[string][]v1.Metadata, error) { + // TODO: query all and merge + return s.State().apiClient.Metadata(ctx, metric, limit) +}