diff --git a/Makefile b/Makefile index 62f55a0..4d8b870 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ SHELL = /bin/bash export -LINT_VERSION="1.59.0" +LINT_VERSION="1.59.1" .PHONY: all all: deps lint test diff --git a/go.mod b/go.mod index 9846c72..0a287e2 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( go.etcd.io/etcd/api/v3 v3.5.14 go.etcd.io/etcd/client/v3 v3.5.14 go.uber.org/zap v1.27.0 - golang.org/x/net v0.25.0 + golang.org/x/net v0.26.0 ) require ( @@ -26,8 +26,8 @@ require ( github.com/prometheus/procfs v0.14.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect diff --git a/go.sum b/go.sum index e1c8e87..bf54f7e 100644 --- a/go.sum +++ b/go.sum @@ -60,20 +60,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/metrics/prometheus.go b/metrics/prometheus.go index d3f7de8..a39d351 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -46,7 +46,7 @@ var ( Namespace: "rules", Help: "etcd rules engine callback duration in ms", Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000, 300000, 600000}, - }, []string{"pattern", "rule"}) + }, []string{"pattern", "rule", "source"}) rulesEngineKeyProcessBufferCap = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "key_process_buffer_cap", Subsystem: "etcd", @@ -123,8 +123,8 @@ func WorkBufferWaitTime(methodName, pattern string, startTime time.Time) { } // CallbackWaitTime tracks how much time elapsed between when the rule was evaluated and the callback called. -func CallbackWaitTime(pattern, ruleID string, startTime time.Time) { - rulesEngineCallbackWaitTime.WithLabelValues(pattern, ruleID).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6)) +func CallbackWaitTime(pattern, ruleID, source string, startTime time.Time) { + rulesEngineCallbackWaitTime.WithLabelValues(pattern, ruleID, source).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6)) } // KeyProcessBufferCap tracks the capacity of the key processor buffer. diff --git a/metrics/prometheus_test.go b/metrics/prometheus_test.go index ba5e87e..e274e7e 100644 --- a/metrics/prometheus_test.go +++ b/metrics/prometheus_test.go @@ -69,7 +69,7 @@ func TestWorkBufferWaitTime(t *testing.T) { } func TestCallbackWaitTime(t *testing.T) { - CallbackWaitTime("/desired/key/pattern", "ruleID", time.Now()) + CallbackWaitTime("/desired/key/pattern", "ruleID", "source", time.Now()) checkMetrics(t, `rules_etcd_callback_wait_ms_count{pattern="/desired/key/pattern",rule="ruleID"} 1`) } diff --git a/rules/engine.go b/rules/engine.go index 2f0e2f4..eae15bc 100644 --- a/rules/engine.go +++ b/rules/engine.go @@ -292,7 +292,7 @@ func (e *baseEngine) addRule(rule DynamicRule, } func (e *v3Engine) Run() { - e.logger.Info("Rules engine options", zap.Object("options", &e.options)) + e.logger.Info("Rules engine options", zap.Object("options", &e.options), zap.Int("rules", len(e.ruleMgr.rules))) prefixSlice := []string{} prefixes := e.ruleMgr.prefixes // This is a map; used to ensure there are no duplicates diff --git a/rules/watcher.go b/rules/watcher.go index c09df46..bc6bfeb 100644 --- a/rules/watcher.go +++ b/rules/watcher.go @@ -70,7 +70,7 @@ func (w *watcher) singleRun() { time.Sleep(delay) // TODO ideally a context should be used for fast shutdown, e.g. select { case <-ctx.Done(); case <-time.After(delay) } } w.logger.Debug("Calling process key", zap.String("key", key)) - w.kp.processKey(key, value, w.api, w.logger, map[string]string{}, incRuleProcessedCount) + w.kp.processKey(key, value, w.api, w.logger, map[string]string{"source": "watcher"}, incRuleProcessedCount) } func incRuleProcessedCount(ruleID string) { diff --git a/rules/worker.go b/rules/worker.go index d28d11a..dc2ec9f 100644 --- a/rules/worker.go +++ b/rules/worker.go @@ -72,7 +72,7 @@ func (bw *baseWorker) isStopped() bool { func (bw *baseWorker) doWork(loggerPtr **zap.Logger, rulePtr *staticRule, lockTTL int, callback workCallback, - metricsInfo metricsInfo, lockKey string, ruleID string) { + metricsInfo metricsInfo, lockKey string, ruleID string, source string) { logger := *loggerPtr logger = logger.With(zap.String("ruleID", ruleID), zap.String("mutex", lockKey)) rule := *rulePtr @@ -131,7 +131,7 @@ func (bw *baseWorker) doWork(loggerPtr **zap.Logger, logger.Info("callback started", zap.Any("attributes", attrMap)) startTime := time.Now() callback() - metrics.CallbackWaitTime(metricsInfo.keyPattern, ruleID, startTime) + metrics.CallbackWaitTime(metricsInfo.keyPattern, ruleID, source, startTime) if bw.callbackListener != nil { bw.callbackListener.callbackDone(ruleID, attributes) } @@ -164,6 +164,7 @@ func (w *v3Worker) singleRun() { } w.addWorkerID(newMetadata) task.Metadata = newMetadata + source := task.Metadata["source"] task.Logger = task.Logger.With(zap.String("worker", w.workerID)) // Use wait group and go routine to prevent killing of workers var wg sync.WaitGroup @@ -180,7 +181,7 @@ func (w *v3Worker) singleRun() { task.Context = context task.cancel = cancelFunc metricsInfo := newMetricsInfo(context, work.keyPattern, work.metricsStartTime) - w.doWork(&task.Logger, &work.rule, w.engine.getLockTTLForRule(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID) + w.doWork(&task.Logger, &work.rule, w.engine.getLockTTLForRule(work.ruleIndex), func() { work.ruleTaskCallback(&task) }, metricsInfo, work.lockKey, work.ruleID, source) }() wg.Wait() }