Skip to content

Commit

Permalink
Add tenant-based filtering for Rules API response
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
  • Loading branch information
saswatamcode committed Jan 17, 2022
1 parent 1852b0f commit afdfbde
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 12 deletions.
78 changes: 66 additions & 12 deletions api/metrics/v1/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ const (
labelNamesRoute = "/api/v1/labels"
labelValuesRoute = "/api/v1/label/{label_name}/values"
receiveRoute = "/api/v1/receive"
rulesRoute = "/api/v1/rules/raw"
rulesRoute = "/api/v1/rules"
rulesRawRoute = "/api/v1/rules/raw"
)

type handlerConfiguration struct {
logger log.Logger
registry *prometheus.Registry
instrument handlerInstrumenter
spanRoutePrefix string
tenantLabel string
queryMiddlewares []func(http.Handler) http.Handler
readMiddlewares []func(http.Handler) http.Handler
uiMiddlewares []func(http.Handler) http.Handler
writeMiddlewares []func(http.Handler) http.Handler
logger log.Logger
registry *prometheus.Registry
instrument handlerInstrumenter
spanRoutePrefix string
tenantLabel string
queryMiddlewares []func(http.Handler) http.Handler
readMiddlewares []func(http.Handler) http.Handler
rulesMiddlewares []func(http.Handler) http.Handler
uiMiddlewares []func(http.Handler) http.Handler
writeMiddlewares []func(http.Handler) http.Handler
rulesResponseModifier func(*http.Response) error
}

// HandlerOption modifies the handler's configuration.
Expand Down Expand Up @@ -99,6 +102,20 @@ func WithQueryMiddleware(m func(http.Handler) http.Handler) HandlerOption {
}
}

// WithRulesMiddleware adds a middleware for all rules read related operations.
func WithRulesMiddleware(m func(http.Handler) http.Handler) HandlerOption {
return func(h *handlerConfiguration) {
h.rulesMiddlewares = append(h.rulesMiddlewares, m)
}
}

// WithRulesResponseModifier adds a API response modifer for all rules read related operations.
func WithRulesResponseModifier(m func(*http.Response) error) HandlerOption {
return func(h *handlerConfiguration) {
h.rulesResponseModifier = m
}
}

// WithUIMiddleware adds a middleware for all non read, non query, non write operations (e.g ui).
func WithUIMiddleware(m func(http.Handler) http.Handler) HandlerOption {
return func(h *handlerConfiguration) {
Expand Down Expand Up @@ -216,6 +233,43 @@ func NewHandler(read, write, rulesEndpoint *url.URL, upstreamCA []byte, opts ...
))
})

var proxyRules http.Handler
{
middlewares := proxy.Middlewares(
proxy.MiddlewareSetUpstream(read),
proxy.MiddlewareSetPrefixHeader(),
proxy.MiddlewareLogger(c.logger),
proxy.MiddlewareMetrics(c.registry, prometheus.Labels{"proxy": "metricsv1-rules"}),
)

t := &http.Transport{
DialContext: (&net.Dialer{
Timeout: readTimeout,
}).DialContext,
}

if len(upstreamCA) != 0 {
t.TLSClientConfig = &tls.Config{
RootCAs: x509.NewCertPool(),
}
t.TLSClientConfig.RootCAs.AppendCertsFromPEM(upstreamCA)
}

proxyRules = &httputil.ReverseProxy{
Director: middlewares,
ErrorLog: proxy.Logger(c.logger),
Transport: otelhttp.NewTransport(t),
ModifyResponse: c.rulesResponseModifier,
}
}
r.Group(func(r chi.Router) {
r.Use(c.rulesMiddlewares...)
r.Handle(rulesRoute, c.instrument.NewHandler(
prometheus.Labels{"group": "metricsv1", "handler": "rules"},
otelhttp.WithRouteTag(c.spanRoutePrefix+rulesRoute, proxyRules),
))
})

var uiProxy http.Handler
{
middlewares := proxy.Middlewares(
Expand Down Expand Up @@ -296,12 +350,12 @@ func NewHandler(read, write, rulesEndpoint *url.URL, upstreamCA []byte, opts ...

r.Group(func(r chi.Router) {
r.Use(c.uiMiddlewares...)
r.Get(rulesRoute, rh.get)
r.Get(rulesRawRoute, rh.get)
})

r.Group(func(r chi.Router) {
r.Use(c.writeMiddlewares...)
r.Put(rulesRoute, rh.put)
r.Put(rulesRawRoute, rh.put)
})
}

Expand Down
219 changes: 219 additions & 0 deletions api/metrics/v1/response_modifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package v1

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/observatorium/api/authentication"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
)

type apiResponse struct {
Status string `json:"status"`
Data json.RawMessage `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}

// getAPIResponse decodes /api/v1/rules response.
// Adapted from https://github.com/prometheus-community/prom-label-proxy/blob/952266db4e0b8ab66b690501e532eaef33300596/injectproxy/rules.go#L37.
func getAPIResponse(resp *http.Response) (*apiResponse, error) {
defer resp.Body.Close()
reader := resp.Body

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("unexpected status code: %d", resp.StatusCode)
}

var apir apiResponse
if err := json.NewDecoder(reader).Decode(&apir); err != nil {
return nil, errors.Wrap(err, "JSON decoding")
}

if apir.Status != "success" {
return nil, errors.Errorf("unexpected response status: %q", apir.Status)
}

return &apir, nil
}

type rulesData struct {
RuleGroups []*ruleGroup `json:"groups"`
}

type ruleGroup struct {
Name string `json:"name"`
File string `json:"file"`
Rules []rule `json:"rules"`
Interval float64 `json:"interval"`
EvaluationTime float64 `json:"evaluationTime,omitempty"`
LastEvaluation *time.Time `json:"lastEvaluation,omitempty"`
Limit int32 `json:"limit,omitempty"`
// Thanos Querier specific field.
PartialResponseStrategy string `json:"partialResponseStrategy"`
}

type rule struct {
*alertingRule
*recordingRule
}

func (r *rule) Labels() labels.Labels {
if r.alertingRule != nil {
return r.alertingRule.Labels
}

return r.recordingRule.Labels
}

// MarshalJSON implements the json.Marshaler interface for rule.
func (r *rule) MarshalJSON() ([]byte, error) {
if r.alertingRule != nil {
return json.Marshal(r.alertingRule)
}

return json.Marshal(r.recordingRule)
}

// UnmarshalJSON implements the json.Unmarshaler interface for rule.
func (r *rule) UnmarshalJSON(b []byte) error {
var ruleType struct {
Type string `json:"type"`
}

if err := json.Unmarshal(b, &ruleType); err != nil {
return err
}

switch ruleType.Type {
case "alerting":
var alertingr alertingRule
if err := json.Unmarshal(b, &alertingr); err != nil {
return err
}

r.alertingRule = &alertingr
case "recording":
var recordingr recordingRule
if err := json.Unmarshal(b, &recordingr); err != nil {
return err
}

r.recordingRule = &recordingr
default:
return errors.Errorf("failed to unmarshal rule: unknown type %q", ruleType.Type)
}

return nil
}

type alertingRule struct {
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*alert `json:"alerts"`
Health string `json:"health"`
LastError string `json:"lastError,omitempty"`
// Type of an alertingRule is always "alerting".
Type string `json:"type"`
}

type recordingRule struct {
Name string `json:"name"`
Query string `json:"query"`
Labels labels.Labels `json:"labels,omitempty"`
Health string `json:"health"`
LastError string `json:"lastError,omitempty"`
EvaluationTime float64 `json:"evaluationTime,omitempty"`
LastEvaluation *time.Time `json:"lastEvaluation,omitempty"`
// Type of a recordingRule is always "recording".
Type string `json:"type"`
}

type alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt,omitempty"`
Value string `json:"value"`
}

// ModifyRulesAPIResponse modifies the /api/v1/rules API response by passing the enforced tenant label and id to f().
// Adapted from https://github.com/prometheus-community/prom-label-proxy/blob/952266db4e0b8ab66b690501e532eaef33300596/injectproxy/rules.go#L166.
func ModifyRulesAPIResponse(label string, f func(string, string, *apiResponse) (interface{}, error)) func(*http.Response) error {
return func(resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
// Pass non-200 responses as-is.
return nil
}

id, ok := authentication.GetTenantID(resp.Request.Context())
if !ok {
return errors.New("error finding tenant ID")
}

apir, err := getAPIResponse(resp)
if err != nil {
return errors.Wrap(err, "reading API response")
}

r, err := f(label, id, apir)
if err != nil {
return errors.Wrap(err, "error extracting rules")
}

b, err := json.Marshal(r)
if err != nil {
return errors.Wrap(err, "can't replace data")
}
apir.Data = json.RawMessage(b)

var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(apir); err != nil {
return errors.Wrap(err, "can't encode API response")
}
resp.Body = ioutil.NopCloser(&buf)
resp.Header["Content-Length"] = []string{fmt.Sprint(buf.Len())}

return nil
}
}

// Filters rules based on given label and value.
// Adapted from https://github.com/prometheus-community/prom-label-proxy/blob/952266db4e0b8ab66b690501e532eaef33300596/injectproxy/rules.go#L200.
func FilterRules(label string, value string, resp *apiResponse) (interface{}, error) {
var rgs rulesData
if err := json.Unmarshal(resp.Data, &rgs); err != nil {
return nil, errors.Wrap(err, "can't decode rules data")
}

filtered := []*ruleGroup{}

for _, rg := range rgs.RuleGroups {
var rules []rule

for _, rule := range rg.Rules {
for _, lbl := range rule.Labels() {
if lbl.Name == label && lbl.Value == value {
rules = append(rules, rule)
break
}
}
}

if len(rules) > 0 {
rg.Rules = rules
filtered = append(filtered, rg)
}
}

return &rulesData{RuleGroups: filtered}, nil
}
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ func main() {
metricsv1.WithReadMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "metrics")),
metricsv1.WithReadMiddleware(metricsv1.WithEnforceTenancyOnMatchers(cfg.metrics.tenantLabel)),
metricsv1.WithReadMiddleware(metricsv1.WithEnforceAuthorizationLabels()),
metricsv1.WithRulesMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "metrics")),
metricsv1.WithRulesResponseModifier(metricsv1.ModifyRulesAPIResponse(cfg.metrics.tenantLabel, metricsv1.FilterRules)),
metricsv1.WithUIMiddleware(authorization.WithAuthorizers(authorizers, rbac.Read, "metrics")),
metricsv1.WithWriteMiddleware(authorization.WithAuthorizers(authorizers, rbac.Write, "metrics")),
),
Expand Down

0 comments on commit afdfbde

Please sign in to comment.