Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify direct and caching RuleStores in ruler #9434

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ std.manifestYamlDoc({

memcached:: {
memcached: {
image: 'memcached:1.6.19-alpine',
image: 'memcached:1.6.28-alpine',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated but I needed to be able to watch deletions which was only available in a newer version and this matches what we run in production.

ports: [
'11211:11211',
],
Expand All @@ -303,7 +303,7 @@ std.manifestYamlDoc({

memcached_exporter:: {
'memcached-exporter': {
image: 'prom/memcached-exporter:v0.6.0',
image: 'prom/memcached-exporter:v0.14.4',
command: ['--memcached.address=memcached:11211', '--web.listen-address=0.0.0.0:9150'],
},
},
Expand Down
4 changes: 2 additions & 2 deletions development/mimir-microservices-mode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@
"ports":
- "9900:9900"
"memcached":
"image": "memcached:1.6.19-alpine"
"image": "memcached:1.6.28-alpine"
"ports":
- "11211:11211"
"memcached-exporter":
"command":
- "--memcached.address=memcached:11211"
- "--web.listen-address=0.0.0.0:9150"
"image": "prom/memcached-exporter:v0.6.0"
"image": "prom/memcached-exporter:v0.14.4"
"minio":
"command":
- "server"
Expand Down
3 changes: 1 addition & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,7 @@ type Mimir struct {
QueryFrontendTopicOffsetsReader *ingest.TopicOffsetsReader
QueryFrontendCodec querymiddleware.Codec
Ruler *ruler.Ruler
RulerDirectStorage rulestore.RuleStore
RulerCachedStorage rulestore.RuleStore
RulerStorage rulestore.RuleStore
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.MultitenantCompactor
StoreGateway *storegateway.StoreGateway
Expand Down
10 changes: 4 additions & 6 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,12 @@ func (t *Mimir) initRulerStorage() (serv services.Service, err error) {
// we do accept stale data for about a polling interval (2 intervals in the worst
// case scenario due to the jitter applied).
cacheTTL := t.Cfg.Ruler.PollInterval

t.RulerDirectStorage, t.RulerCachedStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
t.RulerStorage, err = ruler.NewRuleStore(context.Background(), t.Cfg.RulerStorage, t.Overrides, rules.FileLoader{}, cacheTTL, util_log.Logger, t.Registerer)
return
}

func (t *Mimir) initRuler() (serv services.Service, err error) {
if t.RulerDirectStorage == nil {
if t.RulerStorage == nil {
level.Info(util_log.Logger).Log("msg", "The ruler storage has not been configured. Not starting the ruler.")
return nil, nil
}
Expand Down Expand Up @@ -939,8 +938,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
manager,
t.Registerer,
util_log.Logger,
t.RulerDirectStorage,
t.RulerCachedStorage,
t.RulerStorage,
t.Overrides,
)
if err != nil {
Expand All @@ -951,7 +949,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
t.API.RegisterRuler(t.Ruler)

// Expose HTTP configuration and prometheus-compatible Ruler APIs
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerDirectStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)
t.API.RegisterRulerAPI(ruler.NewAPI(t.Ruler, t.RulerStorage, util_log.Logger), t.Cfg.Ruler.EnableAPI, t.BuildInfoHandler)

return t.Ruler, nil
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/mimir/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ func TestMimir_InitRulerStorage(t *testing.T) {
require.NoError(t, err)

if testData.expectedInit {
assert.NotNil(t, mimir.RulerDirectStorage)
assert.NotNil(t, mimir.RulerCachedStorage)
assert.NotNil(t, mimir.RulerStorage)
} else {
assert.Nil(t, mimir.RulerDirectStorage)
assert.Nil(t, mimir.RulerCachedStorage)
assert.Nil(t, mimir.RulerStorage)
}
})
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) {
}

level.Debug(logger).Log("msg", "retrieving rule groups with namespace", "userID", userID, "namespace", namespace)
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace)
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and this API is expected to be strongly consistent.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, namespace, rulestore.WithCacheDisabled())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -606,7 +608,9 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) {

// Only list rule groups when enforcing a max number of groups for this tenant and namespace.
if a.ruler.IsMaxRuleGroupsLimited(userID, namespace) {
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and we need the most up-to-date number.
rgs, err := a.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
if err != nil {
level.Error(logger).Log("msg", "unable to fetch current rule groups for validation", "err", err.Error(), "user", userID)
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestRuler_ListRules(t *testing.T) {
store.setMissingRuleGroups(tc.missingRules)

r := prepareRuler(t, cfg, store, withStart())
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules").Methods("GET").HandlerFunc(a.ListRules)
Expand Down Expand Up @@ -936,7 +936,7 @@ func TestRuler_PrometheusRules(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/rules"+tc.queryParams, nil, userID)
w := httptest.NewRecorder()
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestRuler_PrometheusAlerts(t *testing.T) {
return len(rls.Groups)
})

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

req := requestFor(t, http.MethodGet, "https://localhost:8080/prometheus/api/v1/alerts", nil, "user1")
w := httptest.NewRecorder()
Expand Down Expand Up @@ -1172,7 +1172,7 @@ rules:

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, rulerCfg, newMockRuleStore(make(map[string]rulespb.RuleGroupList)), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup)
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func TestAPI_DeleteNamespace(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}").Methods(http.MethodDelete).HandlerFunc(a.DeleteNamespace)
Expand Down Expand Up @@ -1294,7 +1294,7 @@ func TestAPI_DeleteRuleGroup(t *testing.T) {

reg := prometheus.NewPedanticRegistry()
r := prepareRuler(t, cfg, newMockRuleStore(mockRulesNamespaces), withStart(), withRulerAddrAutomaticMapping(), withPrometheusRegisterer(reg))
a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/prometheus/config/v1/rules/{namespace}/{groupName}").Methods(http.MethodDelete).HandlerFunc(a.DeleteRuleGroup)
Expand Down Expand Up @@ -1336,7 +1336,7 @@ func TestRuler_LimitsPerGroup(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1389,7 +1389,7 @@ func TestRuler_RulerGroupLimits(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 1
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func TestRuler_RulerGroupLimitsDisabled(t *testing.T) {
defaults.RulerMaxRulesPerRuleGroup = 0
})))

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

tc := []struct {
name string
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func TestAPIRoutesCorrectlyHandleInvalidOrgID(t *testing.T) {

r := prepareRuler(t, cfg, newMockRuleStore(map[string]rulespb.RuleGroupList{}), withStart())

a := NewAPI(r, r.directStore, log.NewNopLogger())
a := NewAPI(r, r.store, log.NewNopLogger())

router := mux.NewRouter()
router.Path("/api/v1/rules").Methods(http.MethodGet).HandlerFunc(a.PrometheusRules)
Expand Down
56 changes: 30 additions & 26 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/ruler/rulespb"
"github.com/grafana/mimir/pkg/ruler/rulestore"
"github.com/grafana/mimir/pkg/storage/tsdb/bucketcache"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/grpcencoding/s2"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand Down Expand Up @@ -314,13 +313,12 @@ type MultiTenantManager interface {
type Ruler struct {
services.Service

cfg Config
lifecycler *ring.BasicLifecycler
ring *ring.Ring
directStore rulestore.RuleStore
cachedStore rulestore.RuleStore
manager MultiTenantManager
limits RulesLimits
cfg Config
lifecycler *ring.BasicLifecycler
ring *ring.Ring
store rulestore.RuleStore
manager MultiTenantManager
limits RulesLimits

metrics *rulerMetrics

Expand All @@ -346,20 +344,14 @@ type Ruler struct {
}

// NewRuler creates a new ruler from a distributor and chunk store.
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
// If the cached store is not configured, just fallback to the direct one.
if cachedStore == nil {
cachedStore = directStore
}

return newRuler(cfg, manager, reg, logger, directStore, cachedStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits) (*Ruler, error) {
return newRuler(cfg, manager, reg, logger, store, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg))
}

func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, directStore, cachedStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, store rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) {
ruler := &Ruler{
cfg: cfg,
directStore: directStore,
cachedStore: cachedStore,
store: store,
manager: manager,
registry: reg,
logger: logger,
Expand Down Expand Up @@ -633,7 +625,7 @@ func (r *Ruler) syncRules(ctx context.Context, userIDs []string, reason rulesSyn
func (r *Ruler) loadRuleGroupsToSync(ctx context.Context, configs map[string]rulespb.RuleGroupList) (map[string]rulespb.RuleGroupList, error) {
// Load rule groups.
start := time.Now()
missing, err := r.directStore.LoadRuleGroups(ctx, configs)
missing, err := r.store.LoadRuleGroups(ctx, configs)
r.metrics.loadRuleGroups.Observe(time.Since(start).Seconds())

if err != nil {
Expand All @@ -660,7 +652,12 @@ func (r *Ruler) listRuleGroupsToSyncForAllUsers(ctx context.Context, reason rule

// In order to reduce API calls to the object storage among all ruler replicas,
// we support lookup of stale data for a short period.
users, err := r.cachedStore.ListAllUsers(bucketcache.WithCacheLookupEnabled(ctx, cacheLookupEnabled))
var opts []rulestore.Option
if !cacheLookupEnabled {
opts = append(opts, rulestore.WithCacheDisabled())
}

users, err := r.store.ListAllUsers(ctx, opts...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the cache/don't cache option here doesn't actually affect anything since we specifically configure tenant listing calls to not be cached but it doesn't hurt to make that explicit here in case we change the config in the future.

if err != nil {
return nil, errors.Wrap(err, "unable to list users of ruler")
}
Expand Down Expand Up @@ -711,11 +708,16 @@ func (r *Ruler) listRuleGroupsToSyncForUsers(ctx context.Context, userIDs []stri
concurrency = len(userRings)
}

var opts []rulestore.Option
if !cacheLookupEnabled {
opts = append(opts, rulestore.WithCacheDisabled())
}

g, gctx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
g.Go(func() error {
for userID := range userCh {
groups, err := r.cachedStore.ListRuleGroupsForUserAndNamespace(bucketcache.WithCacheLookupEnabled(gctx, cacheLookupEnabled), userID, "")
groups, err := r.store.ListRuleGroupsForUserAndNamespace(gctx, userID, "", opts...)
if err != nil {
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
Expand Down Expand Up @@ -1224,7 +1226,7 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque
return
}

err = r.directStore.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups.
err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups.
if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) {
respondServerError(logger, w, err.Error())
return
Expand All @@ -1238,8 +1240,8 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque

func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
logger := util_log.WithContext(req.Context(), r.logger)

userIDs, err := r.directStore.ListAllUsers(req.Context())
// Disable caching when getting a list of users since this API is expected to be strongly consistent.
userIDs, err := r.store.ListAllUsers(req.Context(), rulestore.WithCacheDisabled())
if err != nil {
level.Error(logger).Log("msg", errListAllUser, "err", err)
http.Error(w, fmt.Sprintf("%s: %s", errListAllUser, err.Error()), http.StatusInternalServerError)
Expand All @@ -1255,12 +1257,14 @@ func (r *Ruler) ListAllRules(w http.ResponseWriter, req *http.Request) {
}()

err = concurrency.ForEachUser(req.Context(), userIDs, fetchRulesConcurrency, func(ctx context.Context, userID string) error {
rg, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
// Disable any caching when getting list of all rule groups since listing results
// are cached and not invalidated and this API is expected to be strongly consistent.
rg, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "", rulestore.WithCacheDisabled())
if err != nil {
return errors.Wrapf(err, "failed to fetch ruler config for user %s", userID)
}
userRules := map[string]rulespb.RuleGroupList{userID: rg}
if missing, err := r.directStore.LoadRuleGroups(ctx, userRules); err != nil {
if missing, err := r.store.LoadRuleGroups(ctx, userRules); err != nil {
return errors.Wrapf(err, "failed to load ruler config for user %s", userID)
} else if len(missing) > 0 {
// This API is expected to be strongly consistent, so it's an error if any rule group was missing.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/bucket/filesystem"
"github.com/grafana/mimir/pkg/util"
util_test "github.com/grafana/mimir/pkg/util/test"
utiltest "github.com/grafana/mimir/pkg/util/test"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestMain(m *testing.M) {
util_test.VerifyNoLeakTestMain(m)
utiltest.VerifyNoLeakTestMain(m)
}

func defaultRulerConfig(t testing.TB) Config {
Expand Down Expand Up @@ -210,7 +210,7 @@ func prepareRuler(t *testing.T, cfg Config, storage rulestore.RuleStore, opts ..
options := applyPrepareOptions(t, cfg.Ring.Common.InstanceID, opts...)
manager := prepareRulerManager(t, cfg, opts...)

ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap))
ruler, err := newRuler(cfg, manager, options.registerer, options.logger, storage, options.limits, newMockClientsPool(cfg, options.logger, options.registerer, options.rulerAddrMap))
require.NoError(t, err)

if options.rulerAddrAutoMap {
Expand Down Expand Up @@ -1571,7 +1571,7 @@ func verifyExpectedDeletedRuleGroupsForUser(t *testing.T, r *Ruler, userID strin
ctx := context.Background()

t.Run("ListRuleGroupsForUserAndNamespace()", func(t *testing.T) {
list, err := r.directStore.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
list, err := r.store.ListRuleGroupsForUserAndNamespace(ctx, userID, "")
require.NoError(t, err)

if expectedDeleted {
Expand Down
Loading
Loading