From 1fe17a26c97f9673b38eefc0c20b88396d130da1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 15 Feb 2023 15:04:03 -0800 Subject: [PATCH 1/8] Removing unused config Signed-off-by: Alan Protasio --- pkg/cortex/runtime_config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 67bfd928127..0011b6a312d 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -26,8 +26,6 @@ type runtimeConfigValues struct { Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` - IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"` - IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` } From 4fa56dabaed1a7618f4a8300a9d559d9f6b535a3 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 16 Feb 2023 00:16:58 -0800 Subject: [PATCH 2/8] alert manager Signed-off-by: Alan Protasio --- pkg/alertmanager/distributor_test.go | 2 +- pkg/alertmanager/multitenant.go | 7 +-- pkg/compactor/compactor.go | 9 ++-- pkg/cortex/modules.go | 11 +++++ pkg/cortex/runtime_config.go | 6 +++ pkg/ingester/ingester.go | 2 +- pkg/ruler/ruler.go | 7 ++- pkg/util/allowed_tenants.go | 71 ++++++++++++++++++++++++---- pkg/util/allowed_tenants_test.go | 8 ++-- 9 files changed, 95 insertions(+), 28 deletions(-) diff --git a/pkg/alertmanager/distributor_test.go b/pkg/alertmanager/distributor_test.go index 40ef486cfaa..6364a20f4e9 100644 --- a/pkg/alertmanager/distributor_test.go +++ b/pkg/alertmanager/distributor_test.go @@ -276,7 +276,7 @@ func TestDistributor_DistributeRequest(t *testing.T) { req.RequestURI = url var allowedTenants *util.AllowedTenants if c.isTenantDisabled { - allowedTenants = util.NewAllowedTenants(nil, []string{"1"}) + allowedTenants = util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: []string{"1"}}, nil) } w := httptest.NewRecorder() diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index e868c0cd51a..6024dfadf2f 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -86,8 +86,9 @@ type MultitenantAlertmanagerConfig struct { // For the state persister. Persister PersisterConfig `yaml:",inline"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + util.AllowedTenantConfig + + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` } type ClusterConfig struct { @@ -366,7 +367,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC logger: log.With(logger, "component", "MultiTenantAlertmanager"), registry: registerer, limits: limits, - allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, nil), ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_alertmanager_ring_check_errors_total", Help: "Number of errors that have occurred when checking the ring for ownership.", diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e665d3ec0db..950af81160b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -30,7 +30,6 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/backoff" - "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -187,9 +186,6 @@ type Config struct { // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` - // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingStrategy string `yaml:"sharding_strategy"` @@ -208,6 +204,9 @@ type Config struct { // Block visit marker file config BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` + + // Allowed TenantConfig + util.AllowedTenantConfig } // RegisterFlags registers the Compactor flags. @@ -396,7 +395,7 @@ func newCompactor( bucketClientFactory: bucketClientFactory, blocksGrouperFactory: blocksGrouperFactory, blocksCompactorFactory: blocksCompactorFactory, - allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(compactorCfg.AllowedTenantConfig, nil), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9470b86042b..be0513bf58b 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/opentracing-contrib/go-stdlib/nethttp" @@ -627,6 +628,16 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { return } + if t.RuntimeConfig != nil { + t.Cfg.Alertmanager.AllowedTenantConfigFn = func() *util.AllowedTenantConfig { + val := t.RuntimeConfig.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.alertManager + } + return nil + } + } + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 0011b6a312d..05966d276fd 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -18,6 +18,10 @@ var ( errMultipleDocuments = errors.New("the provided runtime configuration contains multiple documents") ) +type runtimeAllowedTenantConfig struct { + alertManager *util.AllowedTenantConfig `yaml:"alert_manager"` +} + // runtimeConfigValues are values that can be reloaded from configuration file while Cortex is running. // Reloading is done by runtime_config.Manager, which also keeps the currently loaded config. // These values are then pushed to the components that are interested in them. @@ -27,6 +31,8 @@ type runtimeConfigValues struct { Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` IngesterLimits *ingester.InstanceLimits `yaml:"ingester_limits"` + + AllowedTenantConfig runtimeAllowedTenantConfig `yaml:"allowed_tenant"` } // runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dde76d16ef3..988ef027e2d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2595,7 +2595,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) { tenants := r.Form[tenantParam] - allowedUsers := util.NewAllowedTenants(tenants, nil) + allowedUsers := util.NewAllowedTenants(util.AllowedTenantConfig{DisabledTenants: nil, EnabledTenants: tenants}, nil) run := func() { ingCtx := i.BasicService.ServiceContext() if ingCtx == nil || ingCtx.Err() != nil { diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index e932292213e..97c69866454 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -116,8 +116,7 @@ type Config struct { EnableAPI bool `yaml:"enable_api"` - EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` - DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + util.AllowedTenantConfig RingCheckPeriod time.Duration `yaml:"-"` @@ -266,7 +265,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger: logger, limits: limits, clientsPool: clientPool, - allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, nil), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", @@ -337,7 +336,7 @@ func (r *Ruler) starting(ctx context.Context) error { if r.cfg.EnableSharding { var err error - if r.subservices, err = services.NewManager(r.lifecycler, r.ring, r.clientsPool); err != nil { + if r.subservices, err = services.NewManager(r.lifecycler, r.ring, r.clientsPool, r.allowedTenants); err != nil { return errors.Wrap(err, "unable to start ruler subservices") } diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 88c7a6333b8..fd317e20b0a 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -1,39 +1,90 @@ package util +import ( + "context" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/services" +) + +type AllowedTenantConfig struct { + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` +} + // AllowedTenants that can answer whether tenant is allowed or not based on configuration. // Default value (nil) allows all tenants. type AllowedTenants struct { + services.Service + // If empty, all tenants are enabled. If not empty, only tenants in the map are enabled. enabled map[string]struct{} // If empty, no tenants are disabled. If not empty, tenants in the map are disabled. disabled map[string]struct{} + + allowedTenantConfigFn func() *AllowedTenantConfig + m sync.RWMutex } // NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. // If there are any enabled tenants, then only those tenants are allowed. // If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead. -func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { - a := &AllowedTenants{} +func NewAllowedTenants(cfg AllowedTenantConfig, allowedTenantConfigFn func() *AllowedTenantConfig) *AllowedTenants { + a := &AllowedTenants{ + allowedTenantConfigFn: allowedTenantConfigFn, + } + + if allowedTenantConfigFn == nil { + a.setConfig(allowedTenantConfigFn()) + } else { + a.setConfig(&cfg) + } + + a.Service = services.NewBasicService(nil, a.running, nil) - if len(enabled) > 0 { - a.enabled = make(map[string]struct{}, len(enabled)) - for _, u := range enabled { + return a +} + +func (a *AllowedTenants) running(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if a.allowedTenantConfigFn == nil { + a.setConfig(a.allowedTenantConfigFn()) + } + case <-ctx.Done(): + return nil + } + } +} + +func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { + a.m.Lock() + defer a.m.Unlock() + if len(cfg.EnabledTenants) > 0 { + a.enabled = make(map[string]struct{}, len(cfg.EnabledTenants)) + for _, u := range cfg.EnabledTenants { a.enabled[u] = struct{}{} } } - if len(disabled) > 0 { - a.disabled = make(map[string]struct{}, len(disabled)) - for _, u := range disabled { + if len(cfg.DisabledTenants) > 0 { + a.disabled = make(map[string]struct{}, len(cfg.DisabledTenants)) + for _, u := range cfg.DisabledTenants { a.disabled[u] = struct{}{} } } - - return a } func (a *AllowedTenants) IsAllowed(tenantID string) bool { + a.m.RUnlock() + defer a.m.RUnlock() if a == nil { return true } diff --git a/pkg/util/allowed_tenants_test.go b/pkg/util/allowed_tenants_test.go index 221e0a9e603..6ef2497079a 100644 --- a/pkg/util/allowed_tenants_test.go +++ b/pkg/util/allowed_tenants_test.go @@ -7,14 +7,14 @@ import ( ) func TestAllowedTenants_NoConfig(t *testing.T) { - a := NewAllowedTenants(nil, nil) + a := NewAllowedTenants(AllowedTenantConfig{}, nil) require.True(t, a.IsAllowed("all")) require.True(t, a.IsAllowed("tenants")) require.True(t, a.IsAllowed("allowed")) } func TestAllowedTenants_Enabled(t *testing.T) { - a := NewAllowedTenants([]string{"A", "B"}, nil) + a := NewAllowedTenants(AllowedTenantConfig{EnabledTenants: []string{"A", "B"}}, nil) require.True(t, a.IsAllowed("A")) require.True(t, a.IsAllowed("B")) require.False(t, a.IsAllowed("C")) @@ -22,7 +22,7 @@ func TestAllowedTenants_Enabled(t *testing.T) { } func TestAllowedTenants_Disabled(t *testing.T) { - a := NewAllowedTenants(nil, []string{"A", "B"}) + a := NewAllowedTenants(AllowedTenantConfig{DisabledTenants: []string{"A", "B"}}, nil) require.False(t, a.IsAllowed("A")) require.False(t, a.IsAllowed("B")) require.True(t, a.IsAllowed("C")) @@ -30,7 +30,7 @@ func TestAllowedTenants_Disabled(t *testing.T) { } func TestAllowedTenants_Combination(t *testing.T) { - a := NewAllowedTenants([]string{"A", "B"}, []string{"B", "C"}) + a := NewAllowedTenants(AllowedTenantConfig{EnabledTenants: []string{"A", "B"}, DisabledTenants: []string{"B", "C"}}, nil) require.True(t, a.IsAllowed("A")) // enabled, and not disabled require.False(t, a.IsAllowed("B")) // enabled, but also disabled require.False(t, a.IsAllowed("C")) // disabled From 9d9732c90691114bcdf6f40f97e01b64f7a7a8fa Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 16 Feb 2023 00:25:39 -0800 Subject: [PATCH 3/8] fix lint Signed-off-by: Alan Protasio --- pkg/cortex/modules.go | 2 +- pkg/ruler/ruler_test.go | 6 ++++-- pkg/util/allowed_tenants.go | 9 +++++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index be0513bf58b..cc4ddc28c13 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" - "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/opentracing-contrib/go-stdlib/nethttp" @@ -48,6 +47,7 @@ import ( "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 2bcb6f813d2..249f092acfb 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -899,8 +899,10 @@ func TestSharding(t *testing.T) { HeartbeatTimeout: 1 * time.Minute, }, FlushCheckPeriod: 0, - EnabledTenants: tc.enabledUsers, - DisabledTenants: tc.disabledUsers, + AllowedTenantConfig: util.AllowedTenantConfig{ + EnabledTenants: tc.enabledUsers, + DisabledTenants: tc.disabledUsers, + }, } r := buildRuler(t, cfg, nil, store, nil) diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index fd317e20b0a..cfcbd8a3f23 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -65,6 +65,10 @@ func (a *AllowedTenants) running(ctx context.Context) error { } func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { + if a == nil { + return + } + a.m.Lock() defer a.m.Unlock() if len(cfg.EnabledTenants) > 0 { @@ -83,12 +87,13 @@ func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { } func (a *AllowedTenants) IsAllowed(tenantID string) bool { - a.m.RUnlock() - defer a.m.RUnlock() if a == nil { return true } + a.m.RUnlock() + defer a.m.RUnlock() + if len(a.enabled) > 0 { if _, ok := a.enabled[tenantID]; !ok { return false From de5aba16db766e58f6fbb981a21483c99d2a51ed Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 16 Feb 2023 00:29:04 -0800 Subject: [PATCH 4/8] inline configs Signed-off-by: Alan Protasio --- pkg/alertmanager/multitenant.go | 2 +- pkg/compactor/compactor.go | 6 +++--- pkg/ruler/ruler.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 6024dfadf2f..84f920368c3 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -86,7 +86,7 @@ type MultitenantAlertmanagerConfig struct { // For the state persister. Persister PersisterConfig `yaml:",inline"` - util.AllowedTenantConfig + util.AllowedTenantConfig `yaml:",inline"` AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 950af81160b..17d2782cd66 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -186,6 +186,9 @@ type Config struct { // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` + // Allowed TenantConfig + util.AllowedTenantConfig `yaml:",inline"` + // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingStrategy string `yaml:"sharding_strategy"` @@ -204,9 +207,6 @@ type Config struct { // Block visit marker file config BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` - - // Allowed TenantConfig - util.AllowedTenantConfig } // RegisterFlags registers the Compactor flags. diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 97c69866454..a86efef16e0 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -116,7 +116,7 @@ type Config struct { EnableAPI bool `yaml:"enable_api"` - util.AllowedTenantConfig + util.AllowedTenantConfig `yaml:",inline"` RingCheckPeriod time.Duration `yaml:"-"` From d2babe4fce7b64bbb206202c0540bc06a2e82d2b Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 16 Feb 2023 00:38:33 -0800 Subject: [PATCH 5/8] refactor Signed-off-by: Alan Protasio --- pkg/cortex/modules.go | 12 +----------- pkg/cortex/runtime_config.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index cc4ddc28c13..d9e2fc66a61 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -47,7 +47,6 @@ import ( "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storegateway" - "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" @@ -627,16 +626,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { if err != nil { return } - - if t.RuntimeConfig != nil { - t.Cfg.Alertmanager.AllowedTenantConfigFn = func() *util.AllowedTenantConfig { - val := t.RuntimeConfig.GetConfig() - if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { - return cfg.AllowedTenantConfig.alertManager - } - return nil - } - } + t.Cfg.Alertmanager.AllowedTenantConfigFn = alertManagerAllowedTenantC(t.RuntimeConfig) t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 05966d276fd..29720959639 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -122,6 +122,20 @@ func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.Ins } } +func alertManagerAllowedTenantC(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.alertManager + } + return nil + } +} + func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues) From 0f01424c32b219d571826f2e2c393f9f99459f8a Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 17 Feb 2023 12:04:26 -0800 Subject: [PATCH 6/8] :P Signed-off-by: Alan Protasio --- pkg/alertmanager/multitenant.go | 2 +- pkg/compactor/compactor.go | 3 ++- pkg/cortex/modules.go | 5 ++++- pkg/cortex/runtime_config.go | 32 +++++++++++++++++++++++++++++++- pkg/ruler/ruler.go | 3 ++- pkg/util/allowed_tenants.go | 20 +++++++++++++++----- 6 files changed, 55 insertions(+), 10 deletions(-) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 84f920368c3..39a5ebc380c 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -367,7 +367,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC logger: log.With(logger, "component", "MultiTenantAlertmanager"), registry: registerer, limits: limits, - allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, nil), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn), ringCheckErrors: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_alertmanager_ring_check_errors_total", Help: "Number of errors that have occurred when checking the ring for ownership.", diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 17d2782cd66..f556a0ef971 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -188,6 +188,7 @@ type Config struct { // Allowed TenantConfig util.AllowedTenantConfig `yaml:",inline"` + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` @@ -395,7 +396,7 @@ func newCompactor( bucketClientFactory: bucketClientFactory, blocksGrouperFactory: blocksGrouperFactory, blocksCompactorFactory: blocksCompactorFactory, - allowedTenants: util.NewAllowedTenants(compactorCfg.AllowedTenantConfig, nil), + allowedTenants: util.NewAllowedTenants(compactorCfg.AllowedTenantConfig, compactorCfg.AllowedTenantConfigFn), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index d9e2fc66a61..7f0c648ddcd 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -581,6 +581,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return nil, err } + t.Cfg.Ruler.AllowedTenantConfigFn = rulerAllowedTenant(t.RuntimeConfig) + t.Ruler, err = ruler.NewRuler( t.Cfg.Ruler, manager, @@ -626,7 +628,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { if err != nil { return } - t.Cfg.Alertmanager.AllowedTenantConfigFn = alertManagerAllowedTenantC(t.RuntimeConfig) + t.Cfg.Alertmanager.AllowedTenantConfigFn = alertManagerAllowedTenant(t.RuntimeConfig) t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, store, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { @@ -640,6 +642,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Compactor.AllowedTenantConfigFn = compactorAllowedTenant(t.RuntimeConfig) t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) if err != nil { return diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 29720959639..1ab7c48996d 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -20,6 +20,8 @@ var ( type runtimeAllowedTenantConfig struct { alertManager *util.AllowedTenantConfig `yaml:"alert_manager"` + compactor *util.AllowedTenantConfig `yaml:"compactor"` + ruler *util.AllowedTenantConfig `yaml:"ruler"` } // runtimeConfigValues are values that can be reloaded from configuration file while Cortex is running. @@ -122,7 +124,7 @@ func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.Ins } } -func alertManagerAllowedTenantC(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { +func alertManagerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { if manager == nil { return nil } @@ -136,6 +138,34 @@ func alertManagerAllowedTenantC(manager *runtimeconfig.Manager) func() *util.All } } +func compactorAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.compactor + } + return nil + } +} + +func rulerAllowedTenant(manager *runtimeconfig.Manager) func() *util.AllowedTenantConfig { + if manager == nil { + return nil + } + + return func() *util.AllowedTenantConfig { + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + return cfg.AllowedTenantConfig.ruler + } + return nil + } +} + func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a86efef16e0..6076a26ef23 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -117,6 +117,7 @@ type Config struct { EnableAPI bool `yaml:"enable_api"` util.AllowedTenantConfig `yaml:",inline"` + AllowedTenantConfigFn func() *util.AllowedTenantConfig `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` @@ -265,7 +266,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger: logger, limits: limits, clientsPool: clientPool, - allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, nil), + allowedTenants: util.NewAllowedTenants(cfg.AllowedTenantConfig, cfg.AllowedTenantConfigFn), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index cfcbd8a3f23..ed2f6c83d3b 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -26,6 +26,7 @@ type AllowedTenants struct { disabled map[string]struct{} allowedTenantConfigFn func() *AllowedTenantConfig + defaultCfg *AllowedTenantConfig m sync.RWMutex } @@ -35,14 +36,15 @@ type AllowedTenants struct { func NewAllowedTenants(cfg AllowedTenantConfig, allowedTenantConfigFn func() *AllowedTenantConfig) *AllowedTenants { a := &AllowedTenants{ allowedTenantConfigFn: allowedTenantConfigFn, + defaultCfg: &cfg, } if allowedTenantConfigFn == nil { - a.setConfig(allowedTenantConfigFn()) - } else { - a.setConfig(&cfg) + allowedTenantConfigFn = func() *AllowedTenantConfig { return a.defaultCfg } } + a.setConfig(allowedTenantConfigFn()) + a.Service = services.NewBasicService(nil, a.running, nil) return a @@ -56,7 +58,11 @@ func (a *AllowedTenants) running(ctx context.Context) error { select { case <-ticker.C: if a.allowedTenantConfigFn == nil { - a.setConfig(a.allowedTenantConfigFn()) + c := a.allowedTenantConfigFn() + if c == nil { + c = a.defaultCfg + } + a.setConfig(c) } case <-ctx.Done(): return nil @@ -76,6 +82,8 @@ func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { for _, u := range cfg.EnabledTenants { a.enabled[u] = struct{}{} } + } else { + cfg.EnabledTenants = nil } if len(cfg.DisabledTenants) > 0 { @@ -83,6 +91,8 @@ func (a *AllowedTenants) setConfig(cfg *AllowedTenantConfig) { for _, u := range cfg.DisabledTenants { a.disabled[u] = struct{}{} } + } else { + a.disabled = nil } } @@ -91,7 +101,7 @@ func (a *AllowedTenants) IsAllowed(tenantID string) bool { return true } - a.m.RUnlock() + a.m.RLock() defer a.m.RUnlock() if len(a.enabled) > 0 { From 81156ec4d940470d12c7ed02aafafc11cd26df9d Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 17 Feb 2023 12:47:19 -0800 Subject: [PATCH 7/8] Start/stopping services Signed-off-by: Alan Protasio --- pkg/alertmanager/multitenant.go | 5 +++++ pkg/compactor/compactor.go | 7 ++++++- pkg/ruler/ruler.go | 6 ++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 39a5ebc380c..80553e7b743 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -454,6 +454,10 @@ func (h *handlerForGRPCServer) ServeHTTP(w http.ResponseWriter, req *http.Reques } func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { + if err := services.StartAndAwaitRunning(ctx, am.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } + err = am.migrateStateFilesToPerTenantDirectories() if err != nil { return err @@ -734,6 +738,7 @@ func (am *MultitenantAlertmanager) stopping(_ error) error { // subservices manages ring and lifecycler, if sharding was enabled. _ = services.StopManagerAndAwaitStopped(context.Background(), am.subservices) } + services.StopAndAwaitTerminated(context.Background(), am.allowedTenants) //nolint:errcheck return nil } diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index f556a0ef971..ed12d6cc96c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -482,6 +482,10 @@ func newCompactor( func (c *Compactor) starting(ctx context.Context) error { var err error + if err := services.StartAndAwaitRunning(ctx, c.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } + // Create bucket client. c.bucketClient, err = c.bucketClientFactory(ctx) if err != nil { @@ -578,7 +582,8 @@ func (c *Compactor) starting(ctx context.Context) error { func (c *Compactor) stopping(_ error) error { ctx := context.Background() - services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck + services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck + services.StopAndAwaitTerminated(context.Background(), c.allowedTenants) //nolint:errcheck if c.ringSubservices != nil { return services.StopManagerAndAwaitStopped(ctx, c.ringSubservices) } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 6076a26ef23..6e041ca32be 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -334,6 +334,9 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { func (r *Ruler) starting(ctx context.Context) error { // If sharding is enabled, start the used subservices. + if err := services.StartAndAwaitRunning(ctx, r.allowedTenants); err != nil { + return errors.Wrap(err, "failed to start allowed tenants service") + } if r.cfg.EnableSharding { var err error @@ -361,6 +364,9 @@ func (r *Ruler) stopping(_ error) error { if r.subservices != nil { _ = services.StopManagerAndAwaitStopped(context.Background(), r.subservices) } + + services.StopAndAwaitTerminated(context.Background(), r.allowedTenants) //nolint:errcheck + return nil } From a9012fbb17544709686344a24a81a27ccff40005 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 17 Feb 2023 12:57:43 -0800 Subject: [PATCH 8/8] lint Signed-off-by: Alan Protasio --- pkg/compactor/compactor.go | 4 ++-- pkg/ruler/ruler.go | 4 ++-- pkg/util/allowed_tenants.go | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index ed12d6cc96c..e360e88b525 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -582,8 +582,8 @@ func (c *Compactor) starting(ctx context.Context) error { func (c *Compactor) stopping(_ error) error { ctx := context.Background() - services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck - services.StopAndAwaitTerminated(context.Background(), c.allowedTenants) //nolint:errcheck + services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck + services.StopAndAwaitTerminated(ctx, c.allowedTenants) //nolint:errcheck if c.ringSubservices != nil { return services.StopManagerAndAwaitStopped(ctx, c.ringSubservices) } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 6e041ca32be..e2bba6b39c2 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -340,7 +340,7 @@ func (r *Ruler) starting(ctx context.Context) error { if r.cfg.EnableSharding { var err error - if r.subservices, err = services.NewManager(r.lifecycler, r.ring, r.clientsPool, r.allowedTenants); err != nil { + if r.subservices, err = services.NewManager(r.lifecycler, r.ring, r.clientsPool); err != nil { return errors.Wrap(err, "unable to start ruler subservices") } @@ -366,7 +366,7 @@ func (r *Ruler) stopping(_ error) error { } services.StopAndAwaitTerminated(context.Background(), r.allowedTenants) //nolint:errcheck - + return nil } diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index ed2f6c83d3b..a388162e719 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -34,15 +34,15 @@ type AllowedTenants struct { // If there are any enabled tenants, then only those tenants are allowed. // If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead. func NewAllowedTenants(cfg AllowedTenantConfig, allowedTenantConfigFn func() *AllowedTenantConfig) *AllowedTenants { + if allowedTenantConfigFn == nil { + allowedTenantConfigFn = func() *AllowedTenantConfig { return &cfg } + } + a := &AllowedTenants{ allowedTenantConfigFn: allowedTenantConfigFn, defaultCfg: &cfg, } - if allowedTenantConfigFn == nil { - allowedTenantConfigFn = func() *AllowedTenantConfig { return a.defaultCfg } - } - a.setConfig(allowedTenantConfigFn()) a.Service = services.NewBasicService(nil, a.running, nil)