Skip to content

Commit

Permalink
Implement suggestions: docs + mutex + struct
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <saswataminsta@yahoo.com>
  • Loading branch information
saswatamcode committed Jul 24, 2022
1 parent 6d85395 commit a844956
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 606 deletions.
11 changes: 6 additions & 5 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,12 @@ func runReceive(
}

if (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 {
level.Debug(logger).Log("msg", "setting up periodic meta-monitoring query for limiting cache")
level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(15*time.Second, ctx.Done(), func() error {
if err := webHandler.QueryMetaMonitoring(ctx); err != nil {
if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil {
level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error())
}
return nil
Expand Down Expand Up @@ -848,16 +848,17 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active or HEAD series that a tenant is allowed to have within a Receive topology.").Uint64Var(&rc.maxPerTenantLimit)
cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit)

cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Default("http://localhost:9090").URLVar(&rc.metaMonitoringUrl)
cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl)

cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").StringVar(&rc.metaMonitoringLimitQuery)
cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery)

rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent(
cmd,
"receive.tenant-limits.meta-monitoring-client",
"YAML file or string with http client configs for meta-monitoring.",
extflag.WithHidden(),
)

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())
Expand Down
33 changes: 6 additions & 27 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,20 @@ With such configuration any receive listens for remote write on `<ip>10908/api/v

## Limiting

Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active or HEAD series, to maintain stability of the system. It uses any Prometheus Query API compatible meta-monitoring solution to get the current number of active series, and compares that with a configured limit, before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests are failed fully.
Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully.

Meta-monitoring in this context refers to an external monitoring system scraping all Thanos Receive instances and exposing them in an API compatible with the Prometheus Query API.
Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests.

To use the feature, one should specify the following flags:
- `--receive.tenant-limits.max-head-series`: Specifies the total number of active or HEAD series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
To use the feature, one should specify the following (hidden) flags:
- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
- `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint.
- `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring.
- `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring.

NOTE:
- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times.
- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits.
- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similaly to when one receiver cannot be scraped.
- Support for different limit configuration for different tenants is planned for the future.

## Flags

Expand Down Expand Up @@ -199,28 +200,6 @@ Flags:
--receive.tenant-label-name="tenant_id"
Label name through which the tenant will be
announced.
--receive.tenant-limits.max-head-series=RECEIVE.TENANT-LIMITS.MAX-HEAD-SERIES
The total number of active or HEAD series that
a tenant is allowed to have within a Receive
topology.
--receive.tenant-limits.meta-monitoring-client=<content>
Alternative to
'receive.tenant-limits.meta-monitoring-client-file'
flag (mutually exclusive). Content of YAML file
or string with http client configs for
meta-monitoring.
--receive.tenant-limits.meta-monitoring-client-file=<file-path>
Path to YAML file or string with http client
configs for meta-monitoring.
--receive.tenant-limits.meta-monitoring-query="sum(prometheus_tsdb_head_series) by (tenant)"
PromQL Query to execute against
meta-monitoring, to get the current number of
active series for each tenant, across Receive
replicas.
--receive.tenant-limits.meta-monitoring-url=http://localhost:9090
Meta-monitoring URL which is compatible with
Prometheus Query API for active series
limiting.
--remote-write.address="0.0.0.0:19291"
Address to listen on for remote write requests.
--remote-write.client-server-name=""
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ replace (
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

// TODO(saswatamcode): Remove when https://github.com/efficientgo/tools/pull/14 is merged.
github.com/efficientgo/tools/extkingpin => github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343

github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e

// Override due to https://github.com/weaveworks/common/issues/239
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363/go.mod h1:0Jrqc
github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b h1:ZHiD4/yE4idlbqvAO6iYCOYRzOMRpxkW+FKasRA3tsQ=
github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b h1:rFV4ZGoCKjhOyc4vjrzuCsi9BbrxMJvwmtceN0iR4Zc=
github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4=
github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM=
Expand Down Expand Up @@ -1054,6 +1052,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343 h1:HWf7gTC91QRN18U/ioIjS0QBJmn5cC6KGrurFcOCygc=
github.com/saswatamcode/tools/extkingpin v0.0.0-20220723122803-67d37ea96343/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.9 h1:0roa6gXKgyta64uqh52AQG3wzZXH21unn+ltzQSXML0=
Expand Down
124 changes: 71 additions & 53 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ type Options struct {
MetaMonitoringLimitQuery string
}

// activeSeriesLimit implements active series limiting for web Handler.
type activeSeriesLimit struct {
mtx sync.Mutex
limit uint64
tenantCurrentSeriesMap map[string]float64

metaMonitoringURL *url.URL
metaMonitoringClient *http.Client
metaMonitoringQuery string

configuredTenantLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
type Handler struct {
logger log.Logger
Expand All @@ -117,21 +132,17 @@ type Handler struct {
options *Options
listener net.Listener

mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
metaMonitoringClient *http.Client
tenantCurrentSeriesMap map[string]float64
mtx sync.RWMutex
hashring Hashring
peers *peerGroup
expBackoff backoff.Backoff
peerStates map[string]*retryState
receiverMode ReceiverMode
ActiveSeriesLimit *activeSeriesLimit

forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge
configuredTenantLimit prometheus.Gauge
limitedRequests *prometheus.CounterVec
metaMonitoringErr prometheus.Counter
forwardRequests *prometheus.CounterVec
replications *prometheus.CounterVec
replicationFactor prometheus.Gauge

writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec
Expand Down Expand Up @@ -178,24 +189,29 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Help: "The number of times to replicate incoming write requests.",
},
),
configuredTenantLimit: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_tenant_head_series_limit",
Help: "The configured limit for active or HEAD series of tenants.",
},
),
limitedRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_head_series_limited_requests_total",
Help: "The total number of remote write requests that have been dropped due to head series limiting.",
}, []string{"tenant"},
),
metaMonitoringErr: promauto.With(registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_metamonitoring_failed_queries_total",
Help: "The total number of meta-monitoring queries that failed while limiting.",
},
),
ActiveSeriesLimit: &activeSeriesLimit{
limit: o.MaxPerTenantLimit,
metaMonitoringURL: o.MetaMonitoringUrl,
metaMonitoringQuery: o.MetaMonitoringLimitQuery,
configuredTenantLimit: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_tenant_head_series_limit",
Help: "The configured limit for active (head) series of tenants.",
},
),
limitedRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_head_series_limited_requests_total",
Help: "The total number of remote write requests that have been dropped due to active series limiting.",
}, []string{"tenant"},
),
metaMonitoringErr: promauto.With(registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_metamonitoring_failed_queries_total",
Help: "The total number of meta-monitoring queries that failed while limiting.",
},
),
},
writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "thanos",
Expand Down Expand Up @@ -227,8 +243,8 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
h.replicationFactor.Set(1)
}

h.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit))
h.tenantCurrentSeriesMap = map[string]float64{}
h.ActiveSeriesLimit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit))
h.ActiveSeriesLimit.tenantCurrentSeriesMap = map[string]float64{}

if (h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor) && h.options.MaxPerTenantLimit != 0 {
// Use specified HTTPConfig to make requests to meta-monitoring.
Expand All @@ -242,7 +258,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
level.Error(h.logger).Log("msg", "parsing http config YAML", "err", err.Error())
}

h.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "thanos-receive")
h.ActiveSeriesLimit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit")
if err != nil {
level.Error(h.logger).Log("msg", "improper http client config", "err", err.Error())
}
Expand Down Expand Up @@ -451,7 +467,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

// Impose limits only if Receive is in Router or RouterIngestor mode.
if h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor {
under, err := h.isUnderLimit(tenant, tLogger)
under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger)
if err != nil {
level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error())
}
Expand Down Expand Up @@ -549,25 +565,27 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring
// solution with the configured query for getting current HEAD series of all tenants.
// solution with the configured query for getting current active (head) series of all tenants.
// It then populates tenantCurrentSeries map with result.
func (h *Handler) QueryMetaMonitoring(ctx context.Context) error {
c := promclient.NewWithTracingClient(h.logger, h.metaMonitoringClient, httpconfig.ThanosUserAgent)
func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error {
a.mtx.Lock()
defer a.mtx.Unlock()
c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent)

vectorRes, _, err := c.QueryInstant(ctx, h.options.MetaMonitoringUrl, h.options.MetaMonitoringLimitQuery, time.Now(), promclient.QueryOptions{})
vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{})
if err != nil {
h.metaMonitoringErr.Inc()
a.metaMonitoringErr.Inc()
return err
}

level.Debug(h.logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))
level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes))

// Construct map of tenant name and current HEAD series.
for _, e := range vectorRes {
for k, v := range e.Metric {
if k == "tenant" {
h.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(h.logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
a.tenantCurrentSeriesMap[string(v)] = float64(e.Value)
level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value)
}
}
}
Expand All @@ -577,9 +595,11 @@ func (h *Handler) QueryMetaMonitoring(ctx context.Context) error {

// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit.
// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits.
// TODO(saswatamcode): Add capability to configure diff limits for diff tenants.
func (h *Handler) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
if h.options.MaxPerTenantLimit == 0 || h.options.MetaMonitoringUrl.Host == "" {
// TODO(saswatamcode): Add capability to configure different limits for different tenants.
func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
if a.limit == 0 || a.metaMonitoringURL.Host == "" {
return true, nil
}

Expand All @@ -588,19 +608,17 @@ func (h *Handler) isUnderLimit(tenant string, logger log.Logger) (bool, error) {
// series. As such metric is updated in intervals, it is possible
// that Receive ingests more series than the limit, before detecting that
// a tenant has exceeded the set limits.
v, ok := h.tenantCurrentSeriesMap[tenant]
v, ok := a.tenantCurrentSeriesMap[tenant]
if !ok {
return true, errors.New("tenant not in current series map")
}

if v >= float64(h.options.MaxPerTenantLimit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", h.options.MaxPerTenantLimit)
h.limitedRequests.WithLabelValues(tenant).Inc()
if v >= float64(a.limit) {
level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit)
a.limitedRequests.WithLabelValues(tenant).Inc()
return false, nil
}

level.Debug(logger).Log("msg", "tenant is under limit", "currentSeries", v)

return true, nil
}

Expand Down
Loading

0 comments on commit a844956

Please sign in to comment.