Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Receive: allow unlimited head_series_limit tenants
Browse files Browse the repository at this point in the history
With this commit we now allow to configure tenants with unlimited active
series limit by setting the limit to `0`. Prior to this commit setting a
per tenant limit to `0` would cause the tenant to be unable to write any
metrics at all.

This fixes: #6393

Signed-off-by: Jacob Baungard Hansen <jacobbaungard@redhat.com>
jacobbaungard committed Jun 1, 2023
1 parent eea398e commit 1065519
Showing 5 changed files with 54 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -71,6 +71,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5548](https://github.com/thanos-io/thanos/pull/5548) Query: Add experimental support for load balancing across multiple Store endpoints.
- [#6148](https://github.com/thanos-io/thanos/pull/6148) Query-frontend: Add `traceID` to slow query detected log line.
- [#6153](https://github.com/thanos-io/thanos/pull/6153) Query-frontend: Add `remote_user` (from http basic auth) and `remote_addr` to slow query detected log line.
- [#6406](https://github.com/thanos-io/thanos/pull/6406) Receive: Allow tenants to be configured with unlimited active series by setting head_series_limit to 0.

### Fixed

2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
@@ -241,7 +241,7 @@ Under `global`:
- `meta_monitoring_http_client`: Optional YAML field specifying HTTP client config for meta-monitoring.

Under `default` and per `tenant`:
- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive.
- `head_series_limit`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. Set to 0 for unlimited.

NOTE:
- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times.
5 changes: 5 additions & 0 deletions pkg/receive/head_series_limiter.go
Original file line number Diff line number Diff line change
@@ -155,6 +155,11 @@ func (h *headSeriesLimit) isUnderLimit(tenant string) (bool, error) {
limit = h.defaultLimit
}

// If tenant limit is 0 we treat it as unlimited.
if limit == 0 {
return true, nil
}

if v >= float64(limit) {
level.Error(h.logger).Log("msg", "tenant above limit", "tenant", tenant, "currentSeries", v, "limit", limit)
h.limitedRequests.WithLabelValues(tenant).Inc()
8 changes: 7 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
@@ -500,6 +500,7 @@ type ReceiveBuilder struct {
maxExemplars int
ingestion bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
metaMonitoring string
metaMonitoringQuery string
hashringConfigs []receive.HashringConfig
@@ -554,9 +555,10 @@ func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *R
return r
}

func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, query ...string) *ReceiveBuilder {
func (r *ReceiveBuilder) WithValidationEnabled(limit int, metaMonitoring string, tenantsLimits receive.TenantsWriteLimitsConfig, query ...string) *ReceiveBuilder {
r.limit = limit
r.metaMonitoring = metaMonitoring
r.tenantsLimits = tenantsLimits
if len(query) > 0 {
r.metaMonitoringQuery = query[0]
}
@@ -611,6 +613,10 @@ func (r *ReceiveBuilder) Init() *e2emon.InstrumentedRunnable {
},
}

if r.tenantsLimits != nil {
cfg.WriteLimits.TenantsLimits = r.tenantsLimits
}

b, err := yaml.Marshal(cfg)
if err != nil {
return &e2emon.InstrumentedRunnable{Runnable: e2e.NewFailedRunnable(r.Name(), errors.Wrapf(err, "generate limiting file: %v", hashring))}
47 changes: 40 additions & 7 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
@@ -813,9 +813,13 @@ test_metric{a="2", b="2"} 1`)
},
}

i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init()
tenantsLimits := receive.TenantsWriteLimitsConfig{
"unlimited-tenant": receive.NewEmptyWriteLimitConfig().SetHeadSeriesLimit(0),
}

i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()
i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()
i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName), tenantsLimits).Init()

testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable, i2Runnable, i3Runnable))

@@ -824,7 +828,7 @@ test_metric{a="2", b="2"} 1`)

testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))

// We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it.
// We run three avalanches, one tenant which exceeds the limit, one tenant which remains under it, and one for the unlimited tenant.

// Avalanche in this configuration, would send 5 requests each with 10 new timeseries.
// One request always fails due to TSDB not being ready for new tenant.
@@ -864,7 +868,26 @@ test_metric{a="2", b="2"} 1`)
TenantID: "under-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2))
// Avalanche in this configuration, would send 5 requests each with 10 new timeseries.
// One request always fails due to TSDB not being ready for new tenant.
// So without limiting we end up with 40 timeseries and 40 samples.
avalanche3 := e2ethanos.NewAvalanche(e, "avalanche-3",
e2ethanos.AvalancheOptions{
MetricCount: "10",
SeriesCount: "1",
MetricInterval: "30",
SeriesInterval: "3600",
ValueInterval: "3600",

RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")),
RemoteWriteInterval: "30s",
RemoteBatchSize: "10",
RemoteRequestCount: "5",

TenantID: "unlimited-tenant",
})

testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2, avalanche3))

// Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request.
// 3 limited requests belong to the exceed-tenant.
@@ -876,7 +899,7 @@ test_metric{a="2", b="2"} 1`)
ingestor1Name := e.Name() + "-" + ingestor1.Name()
// Here for exceed-tenant we go above limit by 10, which results in 0 value.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name)
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name)
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
@@ -888,7 +911,7 @@ test_metric{a="2", b="2"} 1`)

// For under-tenant we stay at -5, as we have only pushed 5 series.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string {
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\"}", ingestor1Name)
return fmt.Sprintf("sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_head_series_limit{instance=\"%s:8080\", job=\"receive-i1\", tenant=\"\"}", ingestor1Name)
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
@@ -918,6 +941,16 @@ test_metric{a="2", b="2"} 1`)
},
})

// Query meta-monitoring solution to assert that 40 timeseries have been ingested for unlimited-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"unlimited-tenant\"})" }, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, model.Vector{
&model.Sample{
Metric: model.Metric{},
Value: model.SampleValue(40),
},
})

// Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant.
queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{
Deduplicate: true,

0 comments on commit 1065519

Please sign in to comment.