diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdc480d88..46ca69f5b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ * [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663 * [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680 * [FEATURE] Ruler: Add support for group labels. #6665 +* [FEATURE] Query federation: Introduce a regex tenant resolver to allow regex in `X-Scope-OrgID` value. #6713 +- Add a `tenant-federation.regex-matcher-enabled` flag. If it enabled, user can input regex to `X-Scope-OrgId`, the matched tenantIDs are automatically involved. +- Add a `tenant-federation.user-sync-interval` flag, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs. * [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 * [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 93293edf82..f2763c09ae 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -171,6 +171,18 @@ tenant_federation: # CLI flag: -tenant-federation.max-tenant [max_tenant: | default = 0] + # [Experimental] If enabled, the `X-Scope-OrgID` header value can accept a + # regex and the matched tenantIDs are automatically involved. The regex + # matching rule follows the Prometheus, see the detail: + # https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions. + # CLI flag: -tenant-federation.regex-matcher-enabled + [regex_matcher_enabled: | default = false] + + # If the regex matcher is enabled, it specifies how frequently to scan users. + # The scanned users are used to calculate matched tenantIDs. + # CLI flag: -tenant-federation.user-sync-interval + [user_sync_interval: | default = 5m] + # The ruler_config configures the Cortex ruler. [ruler: ] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 0922b47859..9ac75b34cc 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -64,7 +64,9 @@ Currently experimental features are: - Blocks storage bucket index - The bucket index support in the querier and store-gateway (enabled via `-blocks-storage.bucket-store.bucket-index.enabled=true`) is experimental - The block deletion marks migration support in the compactor (`-compactor.block-deletion-marks-migration-enabled`) is temporarily and will be removed in future versions -- Querier: tenant federation +- Querier: + - Tenant federation (`-tenant-federation.enabled`) + - Enable regex matcher when the tenant federation is enabled (`-tenant-federation.regex-matcher-enabled`) - The thanosconvert tool for converting Thanos block metadata to Cortex - HA Tracker: cleanup of old replicas from KV Store. - Instance limits in ingester and distributor diff --git a/integration/querier_tenant_federation_test.go b/integration/querier_tenant_federation_test.go index 11c4e545fc..69988b1a01 100644 --- a/integration/querier_tenant_federation_test.go +++ b/integration/querier_tenant_federation_test.go @@ -28,18 +28,25 @@ type querierTenantFederationConfig struct { func TestQuerierTenantFederation(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{}) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{}) } func TestQuerierTenantFederationWithQueryScheduler(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{ querySchedulerEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + querySchedulerEnabled: true, + }) } func TestQuerierTenantFederationWithShuffleSharding(t *testing.T) { runQuerierTenantFederationTest(t, querierTenantFederationConfig{ shuffleShardingEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + shuffleShardingEnabled: true, + }) } func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing.T) { @@ -47,6 +54,138 @@ func TestQuerierTenantFederationWithQuerySchedulerAndShuffleSharding(t *testing. querySchedulerEnabled: true, shuffleShardingEnabled: true, }) + runQuerierTenantFederationTest_UseRegexResolver(t, querierTenantFederationConfig{ + querySchedulerEnabled: true, + shuffleShardingEnabled: true, + }) +} + +func runQuerierTenantFederationTest_UseRegexResolver(t *testing.T, cfg querierTenantFederationConfig) { + const numUsers = 10 + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + memcached := e2ecache.NewMemcached() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range + "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-tenant-federation.enabled": "true", + "-tenant-federation.regex-matcher-enabled": "true", + "-tenant-federation.user-sync-interval": "1s", + }) + + // Start the query-scheduler if enabled. + var queryScheduler *e2ecortex.CortexService + if cfg.querySchedulerEnabled { + queryScheduler = e2ecortex.NewQueryScheduler("query-scheduler", flags, "") + require.NoError(t, s.StartAndWaitReady(queryScheduler)) + flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + } + + if cfg.shuffleShardingEnabled { + // Use only single querier for each user. + flags["-frontend.max-queriers-per-tenant"] = "1" + } + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start ingester and distributor. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(ingester, distributor)) + + // Wait until distributor have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push a series for each user to Cortex. + now := time.Now() + expectedVectors := make([]model.Vector, numUsers) + tenantIDs := make([]string, numUsers) + + for u := 0; u < numUsers; u++ { + tenantIDs[u] = fmt.Sprintf("user-%d", u) + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", tenantIDs[u]) + require.NoError(t, err) + + var series []prompb.TimeSeries + series, expectedVectors[u] = generateSeries("series_1", now) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + // Start the query-frontend. + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + if !cfg.querySchedulerEnabled { + flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint() + } + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + var querier2 *e2ecortex.CortexService + if cfg.shuffleShardingEnabled { + querier2 = e2ecortex.NewQuerier("querier-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + } + + // Start queriers. + require.NoError(t, s.StartAndWaitReady(querier)) + require.NoError(t, s.WaitReady(queryFrontend)) + if cfg.shuffleShardingEnabled { + require.NoError(t, s.StartAndWaitReady(querier2)) + } + + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + if cfg.shuffleShardingEnabled { + require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + } + + // wait to update knownUsers + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + if cfg.shuffleShardingEnabled { + require.NoError(t, querier2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_regex_resolver_last_update_run_timestamp_seconds"}), e2e.WaitMissingMetrics) + } + + // query all tenants + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-.+") + require.NoError(t, err) + + result, err := c.Query("series_1", now) + require.NoError(t, err) + + assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector)) + + // ensure a push to multiple tenants is failing + series, _ := generateSeries("series_1", now) + res, err := c.Push(series) + require.NoError(t, err) + + require.Equal(t, 500, res.StatusCode) + + // check metric label values for total queries in the query frontend + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+"), + labels.MustNewMatcher(labels.MatchEqual, "op", "query")))) + + // check metric label values for query queue length in either query frontend or query scheduler + queueComponent := queryFrontend + queueMetricName := "cortex_query_frontend_queue_length" + if cfg.querySchedulerEnabled { + queueComponent = queryScheduler + queueMetricName = "cortex_query_scheduler_queue_length" + } + require.NoError(t, queueComponent.WaitSumMetricsWithOptions(e2e.Equals(0), []string{queueMetricName}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-.+")))) } func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationConfig) { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3002e64667..fc2fdfab81 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -51,6 +51,7 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/grpcclient" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/modules" @@ -282,6 +283,14 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true + if t.Cfg.TenantFederation.RegexMatcherEnabled { + util_log.WarnExperimentalUse("tenant-federation.regex-matcher-enabled") + // If regex matcher enabled, we should set the byPassForSingleQuerier as false + // because if the # of matched tenantIDs is only one, `X-Scope-OrgID` header is + // set to input regex. + byPassForSingleQuerier = false + tenant.WithDefaultResolver(tenantfederation.NewRegexResolver(prometheus.DefaultRegisterer, t.Cfg.TenantFederation.UserSyncInterval, util_log.Logger, t.Distributor.AllUserStats)) + } t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer) t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer) @@ -488,6 +497,11 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) + if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled { + // If regex matcher enabled, we use regex validator to pass regex to the querier + tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) + } + queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, @@ -767,6 +781,11 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { } func (t *Cortex) initQueryScheduler() (services.Service, error) { + if t.Cfg.TenantFederation.Enabled && t.Cfg.TenantFederation.RegexMatcherEnabled { + // If regex matcher enabled, we use regex validator to pass regex to the querier + tenant.WithDefaultResolver(tenantfederation.NewRegexValidator()) + } + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, errors.Wrap(err, "query-scheduler init") diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go index 309f2dea53..0fbc76d8d4 100644 --- a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go @@ -5,7 +5,9 @@ import ( "errors" "strings" "testing" + "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/exemplar" @@ -14,7 +16,9 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/test" ) var ( @@ -311,6 +315,112 @@ func Test_MergeExemplarQuerier_Select(t *testing.T) { } } +func Test_MergeExemplarQuerier_Select_WhenUseRegexResolver(t *testing.T) { + // set a regex tenant resolver + reg := prometheus.NewRegistry() + userStat := ingester.UserStats{} + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{ + {UserID: "user-1", UserStats: userStat}, + {UserID: "user-2", UserStats: userStat}, + }, nil + }) + tenant.WithDefaultResolver(regexResolver) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + tests := []struct { + name string + upstream mockExemplarQueryable + matcher [][]*labels.Matcher + orgId string + expectedResult []exemplar.QueryResult + expectedErr error + expectedMetrics string + }{ + { + name: "result labels should contains __tenant_id__ even if one tenant is queried", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: ".+-1", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedSingleTenantsExemplarMetrics, + }, + { + name: "two tenants results should be aggregated", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-.+", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, false, reg) + ctx := user.InjectOrgID(context.Background(), test.orgId) + q, err := exemplarQueryable.ExemplarQuerier(ctx) + require.NoError(t, err) + + result, err := q.Select(mint, maxt, test.matcher...) + if test.expectedErr != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_exemplar_query")) + require.Equal(t, test.expectedResult, result) + } + }) + } +} + func Test_filterAllTenantsAndMatchers(t *testing.T) { idLabelName := defaultTenantLabel diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 595ef572fa..09f6f4cea5 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/prometheus/client_golang/prometheus" @@ -22,9 +23,11 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/test" ) const ( @@ -472,9 +475,6 @@ cortex_querier_federated_tenants_per_query_count 1 ) func TestMergeQueryable_Select(t *testing.T) { - // Set a multi tenant resolver. - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []selectScenario{ { mergeQueryableScenario: threeTenantsScenario, @@ -653,51 +653,77 @@ func TestMergeQueryable_Select(t *testing.T) { } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - for _, tc := range scenario.selectTestCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + for _, tc := range scenario.selectTestCases { + tc := tc + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", tc.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + userStat := ingester.UserStats{} + + var userIDStats []ingester.UserIDStats + + for _, tenant := range scenario.tenants { + userIDStats = append(userIDStats, ingester.UserIDStats{UserID: tenant, UserStats: userStat}) + } + + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) { + return userIDStats, nil + }) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + ctx = user.InjectOrgID(ctx, "team-.+") + } else { + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + } - seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}, tc.matchers...) + querier, reg, err := scenario.init() + require.NoError(t, err) - if tc.expectedQueryErr != nil { - require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) - } else { - require.NoError(t, seriesSet.Err()) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) - } + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}, tc.matchers...) - if tc.expectedLabels != nil { - require.Equal(t, len(tc.expectedLabels), tc.expectedSeriesCount) - } + if tc.expectedQueryErr != nil { + require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) + } else { + require.NoError(t, seriesSet.Err()) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) + } - count := 0 - for i := 0; seriesSet.Next(); i++ { - count++ if tc.expectedLabels != nil { - require.Equal(t, tc.expectedLabels[i], seriesSet.At().Labels(), fmt.Sprintf("labels index: %d", i)) + require.Equal(t, len(tc.expectedLabels), tc.expectedSeriesCount) } - } - require.Equal(t, tc.expectedSeriesCount, count) - }) + + count := 0 + for i := 0; seriesSet.Next(); i++ { + count++ + if tc.expectedLabels != nil { + require.Equal(t, tc.expectedLabels[i], seriesSet.At().Labels(), fmt.Sprintf("labels index: %d", i)) + } + } + require.Equal(t, tc.expectedSeriesCount, count) + }) + } } }) } } func TestMergeQueryable_LabelNames(t *testing.T) { - // set a multi tenant resolver - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []labelNamesScenario{ { mergeQueryableScenario: singleTenantScenario, @@ -822,38 +848,63 @@ func TestMergeQueryable_LabelNames(t *testing.T) { }, } { scenario := scenario - t.Run(scenario.mergeQueryableScenario.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", scenario.mergeQueryableScenario.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + userStat := ingester.UserStats{} - t.Run(scenario.labelNamesTestCase.name, func(t *testing.T) { - t.Parallel() - labelNames, warnings, err := querier.LabelNames(ctx, nil, scenario.labelNamesTestCase.matchers...) - if scenario.labelNamesTestCase.expectedQueryErr != nil { - require.EqualError(t, err, scenario.labelNamesTestCase.expectedQueryErr.Error()) + var userIDStats []ingester.UserIDStats + + for _, tenant := range scenario.tenants { + userIDStats = append(userIDStats, ingester.UserIDStats{UserID: tenant, UserStats: userStat}) + } + + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) { + return userIDStats, nil + }) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + ctx = user.InjectOrgID(ctx, "team-.+") } else { - require.NoError(t, err) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(scenario.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assert.Equal(t, scenario.labelNamesTestCase.expectedLabelNames, labelNames) - assertEqualWarnings(t, scenario.labelNamesTestCase.expectedWarnings, warnings) + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } } + + querier, reg, err := scenario.init() + require.NoError(t, err) + + t.Run(scenario.labelNamesTestCase.name, func(t *testing.T) { + t.Parallel() + labelNames, warnings, err := querier.LabelNames(ctx, nil, scenario.labelNamesTestCase.matchers...) + if scenario.labelNamesTestCase.expectedQueryErr != nil { + require.EqualError(t, err, scenario.labelNamesTestCase.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(scenario.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assert.Equal(t, scenario.labelNamesTestCase.expectedLabelNames, labelNames) + assertEqualWarnings(t, scenario.labelNamesTestCase.expectedWarnings, warnings) + } + }) }) - }) + } } } func TestMergeQueryable_LabelValues(t *testing.T) { - t.Parallel() - // set a multi tenant resolver - tenant.WithDefaultResolver(tenant.NewMultiResolver()) - for _, scenario := range []labelValuesScenario{ { mergeQueryableScenario: singleTenantScenario, @@ -1028,29 +1079,57 @@ func TestMergeQueryable_LabelValues(t *testing.T) { } { scenario := scenario t.Run(scenario.name, func(t *testing.T) { - for _, tc := range scenario.labelValuesTestCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - querier, reg, err := scenario.init() - require.NoError(t, err) - - // inject tenants into context - ctx := context.Background() - if len(scenario.tenants) > 0 { - ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) - } + for _, useRegexResolver := range []bool{true, false} { + for _, tc := range scenario.labelValuesTestCases { + t.Run(fmt.Sprintf("%s, useRegexResolver: %v", tc.name, useRegexResolver), func(t *testing.T) { + ctx := context.Background() + if useRegexResolver { + reg := prometheus.NewRegistry() + userStat := ingester.UserStats{} + + var userIDStats []ingester.UserIDStats + + for _, tenant := range scenario.tenants { + userIDStats = append(userIDStats, ingester.UserIDStats{UserID: tenant, UserStats: userStat}) + } + + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) { + return userIDStats, nil + }) + + // set a regex tenant resolver + tenant.WithDefaultResolver(regexResolver) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + ctx = user.InjectOrgID(ctx, "team-.+") + } else { + // Set a multi tenant resolver. + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + // inject tenants into context + if len(scenario.tenants) > 0 { + ctx = user.InjectOrgID(ctx, strings.Join(scenario.tenants, "|")) + } + } - actLabelValues, warnings, err := querier.LabelValues(ctx, tc.labelName, nil, tc.matchers...) - if tc.expectedQueryErr != nil { - require.EqualError(t, err, tc.expectedQueryErr.Error()) - } else { + querier, reg, err := scenario.init() require.NoError(t, err) - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) - assert.Equal(t, tc.expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", tc.labelName)) - assertEqualWarnings(t, tc.expectedWarnings, warnings) - } - }) + + actLabelValues, warnings, err := querier.LabelValues(ctx, tc.labelName, nil, tc.matchers...) + if tc.expectedQueryErr != nil { + require.EqualError(t, err, tc.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(tc.expectedMetrics), "cortex_querier_federated_tenants_per_query")) + assert.Equal(t, tc.expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", tc.labelName)) + assertEqualWarnings(t, tc.expectedWarnings, warnings) + } + }) + } } }) } diff --git a/pkg/querier/tenantfederation/metadata_merge_querier_test.go b/pkg/querier/tenantfederation/metadata_merge_querier_test.go index a9a9314733..59e2ba1fc6 100644 --- a/pkg/querier/tenantfederation/metadata_merge_querier_test.go +++ b/pkg/querier/tenantfederation/metadata_merge_querier_test.go @@ -5,15 +5,19 @@ import ( "fmt" "strings" "testing" + "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/test" ) var ( @@ -145,3 +149,77 @@ func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { }) } } + +func Test_mergeMetadataQuerier_MetricsMetadata_WhenUseRegexResolver(t *testing.T) { + // set a regex tenant resolver + reg := prometheus.NewRegistry() + userStat := ingester.UserStats{} + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{ + {UserID: "user-1", UserStats: userStat}, + {UserID: "user-2", UserStats: userStat}, + }, nil + }) + tenant.WithDefaultResolver(regexResolver) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + tests := []struct { + name string + tenantIdToMetadata map[string][]scrape.MetricMetadata + orgId string + expectedResults []scrape.MetricMetadata + expectedMetrics string + }{ + { + name: "single tenant", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-1", + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedSingleTenantsMetadataMetrics, + }, + { + name: "should be merged two tenants results", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + "user-2": { + {MetricFamily: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {MetricFamily: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-.+", + expectedResults: []scrape.MetricMetadata{ + {MetricFamily: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + {MetricFamily: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {MetricFamily: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg = prometheus.NewPedanticRegistry() + upstream := mockMetadataQuerier{ + tenantIdToMetadata: test.tenantIdToMetadata, + } + + mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) + metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId), &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}) + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) + require.Equal(t, test.expectedResults, metadata) + }) + } +} diff --git a/pkg/querier/tenantfederation/regex_resolver.go b/pkg/querier/tenantfederation/regex_resolver.go new file mode 100644 index 0000000000..7b5db56ba7 --- /dev/null +++ b/pkg/querier/tenantfederation/regex_resolver.go @@ -0,0 +1,166 @@ +package tenantfederation + +import ( + "context" + "errors" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/tenant" +) + +var ( + errInvalidRegex = errors.New("invalid regex present") + + allUserStatTimeout = time.Second * 5 +) + +type RegexResolver struct { + allUserStatFunc func(ctx context.Context) ([]ingester.UserIDStats, error) + knownUsers []string + logger log.Logger + sync.Mutex + + // lastUpdateUserRun stores the timestamps of the latest update user loop run + lastUpdateUserRun prometheus.Gauge +} + +func NewRegexResolver(reg prometheus.Registerer, userSyncInterval time.Duration, logger log.Logger, allUserStatFunc func(ctx context.Context) ([]ingester.UserIDStats, error)) *RegexResolver { + r := &RegexResolver{ + allUserStatFunc: allUserStatFunc, + logger: logger, + } + + r.lastUpdateUserRun = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_regex_resolver_last_update_run_timestamp_seconds", + Help: "Unix timestamp of the last successful regex resolver update user run.", + }) + + go r.updateUsersLoop(userSyncInterval, allUserStatTimeout) + return r +} + +func (r *RegexResolver) updateUsersLoop(interval time.Duration, allUserStatTimeout time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + updateFunc := func() { + ctx, cancel := context.WithTimeout(context.Background(), allUserStatTimeout) + defer cancel() + stats, err := r.allUserStatFunc(ctx) + if err != nil { + level.Error(r.logger).Log("msg", "error getting user stats", "err", err) + return + } + + users := make([]string, 0, len(stats)) + for _, stat := range stats { + users = append(users, stat.UserID) + } + + r.Lock() + r.knownUsers = users + // We keep it sort + sort.Strings(r.knownUsers) + r.Unlock() + r.lastUpdateUserRun.SetToCurrentTime() + } + + updateFunc() + for range ticker.C { + updateFunc() + } +} + +func (r *RegexResolver) TenantID(ctx context.Context) (string, error) { + orgIDs, err := r.TenantIDs(ctx) + if err != nil { + return "", err + } + + if len(orgIDs) > 1 { + return "", user.ErrTooManyOrgIDs + } + + return orgIDs[0], nil +} + +func (r *RegexResolver) TenantIDs(ctx context.Context) ([]string, error) { + //lint:ignore faillint wrapper around upstream method + orgID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + orgIDs, err := r.getRegexMatchedOrgIds(orgID) + if err != nil { + return nil, err + } + + return tenant.ValidateOrgIDs(orgIDs) +} + +func (r *RegexResolver) getRegexMatchedOrgIds(orgID string) ([]string, error) { + var matched []string + + // Use the Prometheus FastRegexMatcher + m, err := labels.NewFastRegexMatcher(orgID) + if err != nil { + return nil, errInvalidRegex + } + + r.Lock() + defer r.Unlock() + for _, id := range r.knownUsers { + if m.MatchString(id) { + matched = append(matched, id) + } + } + + if len(matched) == 0 { + // The entered regex could be invalid tenantID, so if there is + // nothing to match, set the `fake` to `X-Scope-OrgID`. + return []string{"fake"}, nil + } + + return matched, nil +} + +type RegexValidator struct{} + +func NewRegexValidator() *RegexValidator { + return &RegexValidator{} +} + +func (r *RegexValidator) TenantID(ctx context.Context) (string, error) { + //lint:ignore faillint wrapper around upstream method + id, err := user.ExtractOrgID(ctx) + if err != nil { + return "", err + } + + _, err = labels.NewFastRegexMatcher(id) + if err != nil { + return "", errInvalidRegex + } + + return id, nil +} + +func (r *RegexValidator) TenantIDs(ctx context.Context) ([]string, error) { + orgID, err := r.TenantID(ctx) + if err != nil { + return nil, err + } + + return []string{orgID}, nil +} diff --git a/pkg/querier/tenantfederation/regex_resolver_test.go b/pkg/querier/tenantfederation/regex_resolver_test.go new file mode 100644 index 0000000000..d6a69dd0b2 --- /dev/null +++ b/pkg/querier/tenantfederation/regex_resolver_test.go @@ -0,0 +1,125 @@ +package tenantfederation + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/util/test" +) + +func Test_RegexResolver(t *testing.T) { + userStat := ingester.UserStats{} + + tests := []struct { + description string + allUserStatFunc func(ctx context.Context) ([]ingester.UserIDStats, error) + orgID string + expectedErr error + expectedOrgIDs []string + }{ + { + description: "invalid regex", + allUserStatFunc: func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{}, nil + }, + orgID: "[a-z", + expectedErr: errInvalidRegex, + }, + { + description: "no matched tenantID", + allUserStatFunc: func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{ + {UserID: "user-1", UserStats: userStat}, + {UserID: "user-2", UserStats: userStat}, + {UserID: "user-3", UserStats: userStat}, + }, nil + }, + orgID: "user-[4-6]", + expectedOrgIDs: []string{"fake"}, + }, + { + description: "use tenantIDsLabelSeparator", + allUserStatFunc: func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{ + {UserID: "user-1", UserStats: userStat}, + {UserID: "user-2", UserStats: userStat}, + {UserID: "user-3", UserStats: userStat}, + }, nil + }, + orgID: "user-1|user-2|user-3", + expectedOrgIDs: []string{"user-1", "user-2", "user-3"}, + }, + { + description: "use regex", + allUserStatFunc: func(ctx context.Context) ([]ingester.UserIDStats, error) { + return []ingester.UserIDStats{ + {UserID: "user-1", UserStats: userStat}, + {UserID: "user-2", UserStats: userStat}, + {UserID: "user-3", UserStats: userStat}, + }, nil + }, + orgID: "user-.+", + expectedOrgIDs: []string{"user-1", "user-2", "user-3"}, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + reg := prometheus.NewRegistry() + regexResolver := NewRegexResolver(reg, time.Second, log.NewNopLogger(), tc.allUserStatFunc) + + // wait update knownUsers + test.Poll(t, time.Second*10, true, func() interface{} { + return testutil.ToFloat64(regexResolver.lastUpdateUserRun) > 0 + }) + + // set regexOrgID + ctx := context.Background() + ctx = user.InjectOrgID(ctx, tc.orgID) + orgIDs, err := regexResolver.TenantIDs(ctx) + + if tc.expectedErr != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expectedOrgIDs, orgIDs) + } + }) + } +} + +func Test_RegexValidator(t *testing.T) { + tests := []struct { + description string + orgID string + expectedErr error + }{ + { + description: "valid regex", + orgID: "user-.*", + expectedErr: nil, + }, + { + description: "invalid regex", + orgID: "[a-z", + expectedErr: errInvalidRegex, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + regexValidator := NewRegexValidator() + ctx := user.InjectOrgID(context.Background(), tc.orgID) + _, err := regexValidator.TenantID(ctx) + require.Equal(t, tc.expectedErr, err) + }) + } +} diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index 4b161ab732..b3ceb1239e 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -2,6 +2,7 @@ package tenantfederation import ( "flag" + "time" ) type Config struct { @@ -11,10 +12,16 @@ type Config struct { MaxConcurrent int `yaml:"max_concurrent"` // MaxTenant A maximum number of tenants to query at once. MaxTenant int `yaml:"max_tenant"` + // RegexMatcherEnabled If true, the `X-Scope-OrgID` header can accept a regex, matched tenantIDs are automatically involved. + RegexMatcherEnabled bool `yaml:"regex_matcher_enabled"` + // UserSyncInterval How frequently to scan users, scanned users are used to calculate matched tenantIDs if the regex matcher is enabled. + UserSyncInterval time.Duration `yaml:"user_sync_interval"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.") f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.") + f.BoolVar(&cfg.RegexMatcherEnabled, "tenant-federation.regex-matcher-enabled", false, "[Experimental] If enabled, the `X-Scope-OrgID` header value can accept a regex and the matched tenantIDs are automatically involved. The regex matching rule follows the Prometheus, see the detail: https://prometheus.io/docs/prometheus/latest/querying/basics/#regular-expressions.") + f.DurationVar(&cfg.UserSyncInterval, "tenant-federation.user-sync-interval", time.Minute*5, "If the regex matcher is enabled, it specifies how frequently to scan users. The scanned users are used to calculate matched tenantIDs.") } diff --git a/pkg/tenant/resolver.go b/pkg/tenant/resolver.go index f0fd8abfea..a0f8c1b9bf 100644 --- a/pkg/tenant/resolver.go +++ b/pkg/tenant/resolver.go @@ -130,6 +130,11 @@ func (t *MultiResolver) TenantIDs(ctx context.Context) ([]string, error) { } orgIDs := strings.Split(orgID, tenantIDsLabelSeparator) + + return ValidateOrgIDs(orgIDs) +} + +func ValidateOrgIDs(orgIDs []string) ([]string, error) { for _, orgID := range orgIDs { if err := ValidTenantID(orgID); err != nil { return nil, err diff --git a/pkg/tenant/tenant.go b/pkg/tenant/tenant.go index c7c772648c..f3f0240472 100644 --- a/pkg/tenant/tenant.go +++ b/pkg/tenant/tenant.go @@ -29,7 +29,7 @@ func (e *errTenantIDUnsupportedCharacter) Error() string { const tenantIDsLabelSeparator = "|" -// NormalizeTenantIDs is creating a normalized form by sortiing and de-duplicating the list of tenantIDs +// NormalizeTenantIDs is creating a normalized form by sorting and de-duplicating the list of tenantIDs func NormalizeTenantIDs(tenantIDs []string) []string { sort.Strings(tenantIDs)