Skip to content

Commit

Permalink
rule: Fix bug when rules were out of sync (#2615)
Browse files Browse the repository at this point in the history
Co-authored-by: johncming <johncming@yahoo.com>
Signed-off-by: Lili Cosic <cosiclili@gmail.com>

Co-authored-by: johncming <johncming@yahoo.com>
  • Loading branch information
lilic and johncming authored May 17, 2020
1 parent a3e8b02 commit 334a41b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS
- [#2501](https://github.com/thanos-io/thanos/pull/2501) Query: gracefully handle additional fields in `SeriesResponse` protobuf message that may be added in the future.
- [#2568](https://github.com/thanos-io/thanos/pull/2568) Query: does not close the connection of strict, static nodes if establishing a connection had succeeded but Info() call failed
- [#2615](https://github.com/thanos-io/thanos/pull/2615) Rule: Fix bugs where rules were out of sync.

### Added

Expand Down
13 changes: 10 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,9 @@ func reloadRules(logger log.Logger,
metrics *RuleMetrics) error {
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
var (
errs tsdberrors.MultiError
files []string
errs tsdberrors.MultiError
files []string
seenFiles = make(map[string]struct{})
)
for _, pat := range ruleFiles {
fs, err := filepath.Glob(pat)
Expand All @@ -774,7 +775,13 @@ func reloadRules(logger log.Logger,
continue
}

files = append(files, fs...)
for _, fp := range fs {
if _, ok := seenFiles[fp]; ok {
continue
}
files = append(files, fp)
seenFiles[fp] = struct{}{}
}
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
Expand Down
21 changes: 18 additions & 3 deletions pkg/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.M
func (m *Manager) RuleGroups() []Group {
m.mtx.RLock()
defer m.mtx.RUnlock()
var res []Group
var groups []Group
for s, r := range m.mgrs {
for _, group := range r.RuleGroups() {
res = append(res, Group{
groups = append(groups, Group{
Group: group,
PartialResponseStrategy: s,
originalFile: m.ruleFiles[group.File()],
})
}
}
return res
return groups
}

func (m *Manager) AlertingRules() []AlertingRule {
Expand Down Expand Up @@ -216,6 +216,21 @@ func (m *Manager) Update(evalInterval time.Duration, files []string) error {
continue
}
}

// Removes the rules from a manager when a strategy has no more rule.
for s, mgr := range m.mgrs {
if _, ok := filesByStrategy[s]; ok {
continue
}

if len(mgr.RuleGroups()) == 0 {
continue
}

if err := mgr.Update(evalInterval, []string{}, nil); err != nil {
errs = append(errs, err)
}
}
m.ruleFiles = ruleFiles
m.mtx.Unlock()

Expand Down
53 changes: 47 additions & 6 deletions pkg/rule/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,17 @@ groups:
Appendable: nopAppendable{},
}
thanosRuleMgr := NewManager(dir)
ruleMgr := rules.NewManager(&opts)
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgr)
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgr)
ruleMgrAbort := rules.NewManager(&opts)
ruleMgrWarn := rules.NewManager(&opts)
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort)
thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn)

testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))
ruleMgrAbort.Run()
ruleMgrWarn.Run()
defer ruleMgrAbort.Stop()
defer ruleMgrWarn.Stop()

ruleMgr.Run()
defer ruleMgr.Stop()
testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")}))

select {
case <-time.After(2 * time.Minute):
Expand Down Expand Up @@ -225,6 +228,44 @@ groups:
}
}

func TestUpdateAfterClear(t *testing.T) {
dir, err := ioutil.TempDir("", "test_rule_rule_groups")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "no_strategy.yaml"), []byte(`
groups:
- name: "something1"
rules:
- alert: "some"
expr: "up"
`), os.ModePerm))

opts := rules.ManagerOptions{
Logger: log.NewLogfmtLogger(os.Stderr),
}
m := NewManager(dir)
ruleMgrAbort := rules.NewManager(&opts)
ruleMgrWarn := rules.NewManager(&opts)
m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort)
m.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn)

ruleMgrAbort.Run()
ruleMgrWarn.Run()
defer ruleMgrAbort.Stop()
defer ruleMgrWarn.Stop()

err = m.Update(1*time.Second, []string{
filepath.Join(dir, "no_strategy.yaml"),
})
testutil.Ok(t, err)
testutil.Equals(t, 1, len(m.RuleGroups()))

err = m.Update(1*time.Second, []string{})
testutil.Ok(t, err)
testutil.Equals(t, 0, len(m.RuleGroups()))
}

func TestRuleGroupMarshalYAML(t *testing.T) {
const expected = `groups:
- name: something1
Expand Down

0 comments on commit 334a41b

Please sign in to comment.