Skip to content

Commit c9670be

Browse files
Restructure mutex such that manager is not holding one mutex for the entirety of sync rules
Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
1 parent 2cd304b commit c9670be

File tree

2 files changed

+223
-36
lines changed

2 files changed

+223
-36
lines changed

pkg/ruler/manager.go

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type DefaultMultiTenantManager struct {
3535

3636
// Structs for holding per-user Prometheus rules Managers
3737
// and a corresponding metrics struct
38-
userManagerMtx sync.Mutex
38+
userManagerMtx sync.RWMutex
3939
userManagers map[string]RulesManager
4040
userManagerMetrics *ManagerMetrics
4141

@@ -50,6 +50,10 @@ type DefaultMultiTenantManager struct {
5050
configUpdatesTotal *prometheus.CounterVec
5151
registry prometheus.Registerer
5252
logger log.Logger
53+
54+
ruleCache map[string][]*promRules.Group
55+
ruleCacheMtx sync.RWMutex
56+
syncRuleMtx sync.Mutex
5357
}
5458

5559
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
@@ -85,6 +89,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
8589
mapper: newMapper(cfg.RulePath, logger),
8690
userManagers: map[string]RulesManager{},
8791
userManagerMetrics: userManagerMetrics,
92+
ruleCache: map[string][]*promRules.Group{},
8893
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
8994
Namespace: "cortex",
9095
Name: "ruler_managers_total",
@@ -111,15 +116,17 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
111116
}
112117

113118
func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
114-
// A lock is taken to ensure if this function is called concurrently, then each call
115-
// returns after the call map files and check for updates
116-
r.userManagerMtx.Lock()
117-
defer r.userManagerMtx.Unlock()
119+
// this is a safety lock to ensure this method is executed sequentially
120+
r.syncRuleMtx.Lock()
121+
defer r.syncRuleMtx.Unlock()
118122

119123
for userID, ruleGroup := range ruleGroups {
120124
r.syncRulesToManager(ctx, userID, ruleGroup)
121125
}
122126

127+
r.userManagerMtx.Lock()
128+
defer r.userManagerMtx.Unlock()
129+
123130
// Check for deleted users and remove them
124131
for userID, mngr := range r.userManagers {
125132
if _, exists := ruleGroups[userID]; !exists {
@@ -142,6 +149,18 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
142149
r.managersTotal.Set(float64(len(r.userManagers)))
143150
}
144151

152+
func (r *DefaultMultiTenantManager) updateRuleCache(user string, rules []*promRules.Group) {
153+
r.ruleCacheMtx.Lock()
154+
defer r.ruleCacheMtx.Unlock()
155+
r.ruleCache[user] = rules
156+
}
157+
158+
func (r *DefaultMultiTenantManager) deleteRuleCache(user string) {
159+
r.ruleCacheMtx.Lock()
160+
defer r.ruleCacheMtx.Unlock()
161+
delete(r.ruleCache, user)
162+
}
163+
145164
// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
146165
// the users Prometheus Rules Manager.
147166
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
@@ -154,25 +173,20 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
154173
return
155174
}
156175

157-
manager, exists := r.userManagers[user]
158-
if !exists || update {
176+
manager, existing := r.getRulesManager(user, ctx)
177+
178+
if manager == nil {
179+
return
180+
}
181+
182+
if !existing || update {
159183
level.Debug(r.logger).Log("msg", "updating rules", "user", user)
160184
r.configUpdatesTotal.WithLabelValues(user).Inc()
161-
if !exists {
162-
level.Debug(r.logger).Log("msg", "creating rule manager for user", "user", user)
163-
manager, err = r.newManager(ctx, user)
164-
if err != nil {
165-
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
166-
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
167-
return
168-
}
169-
// manager.Run() starts running the manager and blocks until Stop() is called.
170-
// Hence run it as another goroutine.
171-
go manager.Run()
172-
r.userManagers[user] = manager
185+
if update && existing {
186+
r.updateRuleCache(user, manager.RuleGroups())
173187
}
174-
175188
err = manager.Update(r.cfg.EvaluationInterval, files, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
189+
r.deleteRuleCache(user)
176190
if err != nil {
177191
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
178192
level.Error(r.logger).Log("msg", "unable to update rule manager", "user", user, "err", err)
@@ -184,6 +198,29 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
184198
}
185199
}
186200

201+
func (r *DefaultMultiTenantManager) getRulesManager(user string, ctx context.Context) (RulesManager, bool) {
202+
r.userManagerMtx.RLock()
203+
manager, exists := r.userManagers[user]
204+
r.userManagerMtx.RUnlock()
205+
if exists {
206+
return manager, true
207+
}
208+
r.userManagerMtx.Lock()
209+
defer r.userManagerMtx.Unlock()
210+
211+
manager, err := r.newManager(ctx, user)
212+
if err != nil {
213+
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
214+
level.Error(r.logger).Log("msg", "unable to create rule manager", "user", user, "err", err)
215+
return nil, false
216+
}
217+
// manager.Run() starts running the manager and blocks until Stop() is called.
218+
// Hence run it as another goroutine.
219+
go manager.Run()
220+
r.userManagers[user] = manager
221+
return manager, false
222+
}
223+
187224
func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
188225
logMessage := []interface{}{
189226
"msg", "evaluating rule group",
@@ -269,13 +306,25 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string, userManag
269306
return n.notifier, nil
270307
}
271308

309+
func (r *DefaultMultiTenantManager) getCachedRules(userID string) ([]*promRules.Group, bool) {
310+
r.ruleCacheMtx.RLock()
311+
defer r.ruleCacheMtx.RUnlock()
312+
groups, exists := r.ruleCache[userID]
313+
return groups, exists
314+
}
315+
272316
func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
273317
var groups []*promRules.Group
274-
r.userManagerMtx.Lock()
275-
if mngr, exists := r.userManagers[userID]; exists {
318+
groups, cached := r.getCachedRules(userID)
319+
if cached {
320+
return groups
321+
}
322+
r.userManagerMtx.RLock()
323+
mngr, exists := r.userManagers[userID]
324+
r.userManagerMtx.RUnlock()
325+
if exists {
276326
groups = mngr.RuleGroups()
277327
}
278-
r.userManagerMtx.Unlock()
279328
return groups
280329
}
281330

pkg/ruler/manager_test.go

Lines changed: 151 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ruler
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

@@ -21,7 +22,14 @@ import (
2122
func TestSyncRuleGroups(t *testing.T) {
2223
dir := t.TempDir()
2324

24-
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
25+
waitDurations := []time.Duration{
26+
1 * time.Millisecond,
27+
1 * time.Millisecond,
28+
}
29+
30+
ruleManagerFactory := RuleManagerFactory(nil, waitDurations)
31+
32+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
2533
require.NoError(t, err)
2634

2735
const user = "testUser"
@@ -97,6 +105,107 @@ func TestSyncRuleGroups(t *testing.T) {
97105
})
98106
}
99107

108+
func TestSlowRuleGroupSyncDoesNotSlowdownListRules(t *testing.T) {
109+
dir := t.TempDir()
110+
const user = "testUser"
111+
userRules := map[string]rulespb.RuleGroupList{
112+
user: {
113+
&rulespb.RuleGroupDesc{
114+
Name: "group1",
115+
Namespace: "ns",
116+
Interval: 1 * time.Minute,
117+
User: user,
118+
},
119+
},
120+
}
121+
122+
groupsToReturn := [][]*promRules.Group{
123+
{
124+
promRules.NewGroup(promRules.GroupOptions{
125+
Name: "group1",
126+
File: "ns",
127+
Interval: 60,
128+
Limit: 0,
129+
Opts: &promRules.ManagerOptions{},
130+
}),
131+
},
132+
{
133+
promRules.NewGroup(promRules.GroupOptions{
134+
Name: "group1",
135+
File: "ns",
136+
Interval: 60,
137+
Limit: 0,
138+
Opts: &promRules.ManagerOptions{},
139+
}),
140+
promRules.NewGroup(promRules.GroupOptions{
141+
Name: "group2",
142+
File: "ns",
143+
Interval: 60,
144+
Limit: 0,
145+
Opts: &promRules.ManagerOptions{},
146+
}),
147+
},
148+
}
149+
150+
waitDurations := []time.Duration{
151+
5 * time.Millisecond,
152+
1 * time.Second,
153+
}
154+
155+
ruleManagerFactory := RuleManagerFactory(groupsToReturn, waitDurations)
156+
157+
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, ruleManagerFactory, nil, nil, log.NewNopLogger())
158+
require.NoError(t, err)
159+
160+
m.SyncRuleGroups(context.Background(), userRules)
161+
162+
mgr := getManager(m, user)
163+
require.NotNil(t, mgr)
164+
165+
test.Poll(t, 1*time.Second, true, func() interface{} {
166+
return mgr.(*mockRulesManager).running.Load()
167+
})
168+
groups := m.GetRules(user)
169+
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
170+
171+
// update rules and call list rules concurrently
172+
userRules = map[string]rulespb.RuleGroupList{
173+
user: {
174+
&rulespb.RuleGroupDesc{
175+
Name: "group1",
176+
Namespace: "ns",
177+
Interval: 1 * time.Minute,
178+
User: user,
179+
},
180+
&rulespb.RuleGroupDesc{
181+
Name: "group2",
182+
Namespace: "ns",
183+
Interval: 1 * time.Minute,
184+
User: user,
185+
},
186+
},
187+
}
188+
go m.SyncRuleGroups(context.Background(), userRules)
189+
190+
groups = m.GetRules(user)
191+
require.Len(t, groups, len(groupsToReturn[0]), "expected %d but got %d", len(groupsToReturn[0]), len(groups))
192+
193+
test.Poll(t, 5*time.Second, len(groupsToReturn[1]), func() interface{} {
194+
groups = m.GetRules(user)
195+
return len(groups)
196+
})
197+
198+
test.Poll(t, 1*time.Second, true, func() interface{} {
199+
return mgr.(*mockRulesManager).running.Load()
200+
})
201+
202+
m.Stop()
203+
204+
test.Poll(t, 1*time.Second, false, func() interface{} {
205+
return mgr.(*mockRulesManager).running.Load()
206+
})
207+
}
208+
100209
func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
101210
dir := t.TempDir()
102211
reg := prometheus.NewPedanticRegistry()
@@ -139,8 +248,8 @@ func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
139248
}
140249

141250
func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
142-
m.userManagerMtx.Lock()
143-
defer m.userManagerMtx.Unlock()
251+
m.userManagerMtx.RLock()
252+
defer m.userManagerMtx.RUnlock()
144253

145254
return m.userManagers[user]
146255
}
@@ -149,9 +258,46 @@ func factory(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ p
149258
return &mockRulesManager{done: make(chan struct{})}
150259
}
151260

261+
func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory {
262+
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
263+
return &mockRulesManager{
264+
done: make(chan struct{}),
265+
groupsToReturn: groupsToReturn,
266+
waitDurations: waitDurations,
267+
iteration: -1,
268+
}
269+
}
270+
}
271+
152272
type mockRulesManager struct {
153-
running atomic.Bool
154-
done chan struct{}
273+
mtx sync.Mutex
274+
groupsToReturn [][]*promRules.Group
275+
iteration int
276+
waitDurations []time.Duration
277+
running atomic.Bool
278+
done chan struct{}
279+
}
280+
281+
func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
282+
m.mtx.Lock()
283+
defer m.mtx.Unlock()
284+
ticker := time.NewTicker(m.waitDurations[m.iteration+1])
285+
select {
286+
case <-ticker.C:
287+
m.iteration = m.iteration + 1
288+
return nil
289+
case <-m.done:
290+
return nil
291+
}
292+
}
293+
294+
func (m *mockRulesManager) RuleGroups() []*promRules.Group {
295+
m.mtx.Lock()
296+
m.mtx.Unlock()
297+
if m.iteration < 0 {
298+
return nil
299+
}
300+
return m.groupsToReturn[m.iteration]
155301
}
156302

157303
func (m *mockRulesManager) Run() {
@@ -163,11 +309,3 @@ func (m *mockRulesManager) Stop() {
163309
m.running.Store(false)
164310
close(m.done)
165311
}
166-
167-
func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, _ string, _ promRules.GroupEvalIterationFunc) error {
168-
return nil
169-
}
170-
171-
func (m *mockRulesManager) RuleGroups() []*promRules.Group {
172-
return nil
173-
}

0 commit comments

Comments
 (0)