Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add more self monitor metrics #2398

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
12 changes: 11 additions & 1 deletion alert/astats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Stats struct {
CounterRuleEvalErrorTotal *prometheus.CounterVec
CounterHeartbeatErrorTotal *prometheus.CounterVec
CounterSubEventTotal *prometheus.CounterVec
CounterQuerySeriesCount *prometheus.CounterVec
}

func NewSyncStats() *Stats {
Expand Down Expand Up @@ -103,7 +104,7 @@ func NewSyncStats() *Stats {
Subsystem: subsystem,
Name: "mute_total",
Help: "Number of mute.",
}, []string{"group"})
}, []string{"group", "rule_id", "mute_rule_id", "datasource_id"})

CounterSubEventTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -119,6 +120,13 @@ func NewSyncStats() *Stats {
Help: "Number of heartbeat error.",
}, []string{})

CounterQuerySeriesCount := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "query_series_count",
Help: "Number of curves retrieved from data source after query.",
}, []string{"rule_id", "datasource_id"})

prometheus.MustRegister(
CounterAlertsTotal,
GaugeAlertQueueSize,
Expand All @@ -133,6 +141,7 @@ func NewSyncStats() *Stats {
CounterRuleEvalErrorTotal,
CounterHeartbeatErrorTotal,
CounterSubEventTotal,
CounterQuerySeriesCount,
)

return &Stats{
Expand All @@ -149,5 +158,6 @@ func NewSyncStats() *Stats {
CounterRuleEvalErrorTotal: CounterRuleEvalErrorTotal,
CounterHeartbeatErrorTotal: CounterHeartbeatErrorTotal,
CounterSubEventTotal: CounterSubEventTotal,
CounterQuerySeriesCount: CounterQuerySeriesCount,
}
}
13 changes: 12 additions & 1 deletion alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]models.Ano
lst = append(lst, points...)
}
}

arw.Processor.Stats.CounterQuerySeriesCount.WithLabelValues(
fmt.Sprintf("%v", arw.Rule.Id),
fmt.Sprintf("%v", arw.Processor.DatasourceId()),
).Add(float64(len(lst)))

return lst, nil
}

Expand Down Expand Up @@ -645,7 +651,6 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
for _, query := range ruleQuery.Queries {
seriesTagIndex := make(map[uint64][]uint64)

arw.Processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.DatasourceId)).Inc()
cli := arw.TdengineClients.GetCli(dsId)
if cli == nil {
logger.Warningf("rule_eval:%d tdengine client is nil", rule.Id)
Expand All @@ -662,6 +667,12 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
arw.Processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.Processor.DatasourceId()), QUERY_DATA, arw.Processor.BusiGroupCache.GetNameByBusiGroupId(arw.Rule.GroupId), fmt.Sprintf("%v", arw.Rule.Id)).Inc()
return points, recoverPoints, err
}

arw.Processor.Stats.CounterQuerySeriesCount.WithLabelValues(
fmt.Sprintf("%v", arw.Rule.Id),
fmt.Sprintf("%v", arw.Processor.DatasourceId()),
).Add(float64(len(series)))

// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
MakeSeriesMap(series, seriesTagIndex, seriesStore)
Expand Down
25 changes: 13 additions & 12 deletions alert/mute/mute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,29 @@ import (
"github.com/toolkits/pkg/logger"
)

func IsMuted(rule *models.AlertRule, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, alertMuteCache *memsto.AlertMuteCacheType) (bool, string) {
func IsMuted(rule *models.AlertRule, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, alertMuteCache *memsto.AlertMuteCacheType) (bool, string, int64) {
if rule.Disabled == 1 {
return true, "rule disabled"
return true, "rule disabled", 0
}

if TimeSpanMuteStrategy(rule, event) {
return true, "rule is not effective for period of time"
return true, "rule is not effective for period of time", 0
}

if IdentNotExistsMuteStrategy(rule, event, targetCache) {
return true, "ident not exists mute"
return true, "ident not exists mute", 0
}

if BgNotMatchMuteStrategy(rule, event, targetCache) {
return true, "bg not match mute"
return true, "bg not match mute", 0
}

if EventMuteStrategy(event, alertMuteCache) {
return true, "match mute rule"
hit, muteId := EventMuteStrategy(event, alertMuteCache)
if hit {
return true, "match mute rule", muteId
}

return false, ""
return false, "", 0
}

// TimeSpanMuteStrategy 根据规则配置的告警生效时间段过滤,如果产生的告警不在规则配置的告警生效时间段内,则不告警,即被mute
Expand Down Expand Up @@ -121,19 +122,19 @@ func BgNotMatchMuteStrategy(rule *models.AlertRule, event *models.AlertCurEvent,
return false
}

func EventMuteStrategy(event *models.AlertCurEvent, alertMuteCache *memsto.AlertMuteCacheType) bool {
func EventMuteStrategy(event *models.AlertCurEvent, alertMuteCache *memsto.AlertMuteCacheType) (bool, int64) {
mutes, has := alertMuteCache.Gets(event.GroupId)
if !has || len(mutes) == 0 {
return false
return false, 0
}

for i := 0; i < len(mutes); i++ {
if matchMute(event, mutes[i]) {
return true
return true, mutes[i].Id
}
}

return false
return false, 0
}

// matchMute 如果传入了clock这个可选参数,就表示使用这个clock表示的时间,否则就从event的字段中取TriggerTime
Expand Down
16 changes: 13 additions & 3 deletions alert/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,26 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
hash := event.Hash
alertingKeys[hash] = struct{}{}
isMuted, detail := mute.IsMuted(cachedRule, event, p.TargetCache, p.alertMuteCache)
isMuted, detail, muteId := mute.IsMuted(cachedRule, event, p.TargetCache, p.alertMuteCache)
if isMuted {
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
logger.Debugf("rule_eval:%s event:%v is muted, detail:%s", p.Key(), event, detail)
p.Stats.CounterMuteTotal.WithLabelValues(
fmt.Sprintf("%v", event.GroupName),
fmt.Sprintf("%v", p.rule.Id),
fmt.Sprintf("%v", muteId),
fmt.Sprintf("%v", p.datasourceId),
).Inc()
continue
}

if p.EventMuteHook(event) {
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
logger.Debugf("rule_eval:%s event:%v is muted by hook", p.Key(), event)
p.Stats.CounterMuteTotal.WithLabelValues(
fmt.Sprintf("%v", event.GroupName),
fmt.Sprintf("%v", p.rule.Id),
fmt.Sprintf("%v", 0),
fmt.Sprintf("%v", p.datasourceId),
).Inc()
continue
}

Expand Down
4 changes: 2 additions & 2 deletions alert/router/router_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (rt *Router) pushEventToQueue(c *gin.Context) {

event.TagsMap[arr[0]] = arr[1]
}

if mute.EventMuteStrategy(event, rt.AlertMuteCache) {
hit, muteId := mute.EventMuteStrategy(event, rt.AlertMuteCache)
if hit && muteId != 0{
logger.Infof("event_muted: rule_id=%d %s", event.RuleId, event.Hash)
ginx.NewRender(c).Message(nil)
return
Expand Down