Skip to content

Commit

Permalink
Add soure label to callback duration metric (#178)
Browse files Browse the repository at this point in the history
Add rules count to engine logging
  • Loading branch information
kramvan1 authored Jun 18, 2024
1 parent 6cb84c1 commit a0d2e61
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SHELL = /bin/bash

export
LINT_VERSION="1.59.0"
LINT_VERSION="1.59.1"

.PHONY: all
all: deps lint test
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
go.etcd.io/etcd/api/v3 v3.5.14
go.etcd.io/etcd/client/v3 v3.5.14
go.uber.org/zap v1.27.0
golang.org/x/net v0.25.0
golang.org/x/net v0.26.0
)

require (
Expand All @@ -26,8 +26,8 @@ require (
github.com/prometheus/procfs v0.14.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
6 changes: 3 additions & 3 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
Namespace: "rules",
Help: "etcd rules engine callback duration in ms",
Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000, 300000, 600000},
}, []string{"pattern", "rule"})
}, []string{"pattern", "rule", "source"})
rulesEngineKeyProcessBufferCap = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "key_process_buffer_cap",
Subsystem: "etcd",
Expand Down Expand Up @@ -123,8 +123,8 @@ func WorkBufferWaitTime(methodName, pattern string, startTime time.Time) {
}

// CallbackWaitTime tracks how much time elapsed between when the rule was evaluated and the callback called.
func CallbackWaitTime(pattern, ruleID string, startTime time.Time) {
rulesEngineCallbackWaitTime.WithLabelValues(pattern, ruleID).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
func CallbackWaitTime(pattern, ruleID, source string, startTime time.Time) {
rulesEngineCallbackWaitTime.WithLabelValues(pattern, ruleID, source).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
}

// KeyProcessBufferCap tracks the capacity of the key processor buffer.
Expand Down
2 changes: 1 addition & 1 deletion metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestWorkBufferWaitTime(t *testing.T) {
}

func TestCallbackWaitTime(t *testing.T) {
CallbackWaitTime("/desired/key/pattern", "ruleID", time.Now())
CallbackWaitTime("/desired/key/pattern", "ruleID", "source", time.Now())
checkMetrics(t, `rules_etcd_callback_wait_ms_count{pattern="/desired/key/pattern",rule="ruleID"} 1`)
}

Expand Down
2 changes: 1 addition & 1 deletion rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (e *baseEngine) addRule(rule DynamicRule,
}

func (e *v3Engine) Run() {
e.logger.Info("Rules engine options", zap.Object("options", &e.options))
e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules)))
prefixSlice := []string{}
prefixes := e.ruleMgr.prefixes
// This is a map; used to ensure there are no duplicates
Expand Down
2 changes: 1 addition & 1 deletion rules/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (w *watcher) singleRun() {
time.Sleep(delay) // TODO ideally a context should be used for fast shutdown, e.g. select { case <-ctx.Done(); case <-time.After(delay) }
}
w.logger.Debug("Calling process key", zap.String("key", key))
w.kp.processKey(key, value, w.api, w.logger, map[string]string{}, incRuleProcessedCount)
w.kp.processKey(key, value, w.api, w.logger, map[string]string{"source": "watcher"}, incRuleProcessedCount)
}

func incRuleProcessedCount(ruleID string) {
Expand Down
7 changes: 4 additions & 3 deletions rules/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (bw *baseWorker) isStopped() bool {

func (bw *baseWorker) doWork(loggerPtr **zap.Logger,
rulePtr *staticRule, lockTTL int, callback workCallback,
metricsInfo metricsInfo, lockKey string, ruleID string) {
metricsInfo metricsInfo, lockKey string, ruleID string, source string) {
logger := *loggerPtr
logger = logger.With(zap.String("ruleID", ruleID), zap.String("mutex", lockKey))
rule := *rulePtr
Expand Down Expand Up @@ -131,7 +131,7 @@ func (bw *baseWorker) doWork(loggerPtr **zap.Logger,
logger.Info("callback started", zap.Any("attributes", attrMap))
startTime := time.Now()
callback()
metrics.CallbackWaitTime(metricsInfo.keyPattern, ruleID, startTime)
metrics.CallbackWaitTime(metricsInfo.keyPattern, ruleID, source, startTime)
if bw.callbackListener != nil {
bw.callbackListener.callbackDone(ruleID, attributes)
}
Expand Down Expand Up @@ -164,6 +164,7 @@ func (w *v3Worker) singleRun() {
}
w.addWorkerID(newMetadata)
task.Metadata = newMetadata
source := task.Metadata["source"]
task.Logger = task.Logger.With(zap.String("worker", w.workerID))
// Use wait group and go routine to prevent killing of workers
var wg sync.WaitGroup
Expand All @@ -180,7 +181,7 @@ func (w *v3Worker) singleRun() {
task.Context = context
task.cancel = cancelFunc
metricsInfo := newMetricsInfo(context, work.keyPattern, work.metricsStartTime)
w.doWork(&task.Logger, &work.rule, w.engine.getLockTTLForRule(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID)
w.doWork(&task.Logger, &work.rule, w.engine.getLockTTLForRule(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID, source)
}()
wg.Wait()
}

0 comments on commit a0d2e61

Please sign in to comment.