From 36cd8d4725ad35f589ebf086de8e34de40bab801 Mon Sep 17 00:00:00 2001 From: Olivia Golden Date: Fri, 3 May 2024 12:12:16 -0500 Subject: [PATCH] adding worker count metric' --- metrics/prometheus.go | 19 +++++++++++++++---- metrics/prometheus_test.go | 4 ++-- rules/engine.go | 4 +++- rules/key_processor.go | 6 ++---- rules/key_processor_test.go | 2 +- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 9fdece4..61efbcd 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -39,7 +39,7 @@ var ( Namespace: "rules", Help: "etcd rules engine work buffer wait time in ms", Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000, 300000, 600000}, - }, []string{"method", "pattern", "workers"}) + }, []string{"method", "pattern"}) rulesEngineCallbackWaitTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "callback_duration_ms", Subsystem: "etcd", @@ -79,13 +79,19 @@ var ( Namespace: "rules", Help: "etcd rules engine crawler values count", }, []string{"name"}) + rulesEngineWorkerCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "worker_count", + Subsystem: "etcd", + Namespace: "rules", + Help: "etcd rules engine worker count", + }, []string{"name"}) ) func init() { prometheus.MustRegister(rulesEngineLockCount, rulesEngineSatisfiedThenNot, rulesEngineEvaluations, rulesEngineWorkerQueueWait, rulesEngineWorkBufferWaitTime, rulesEngineCallbackWaitTime, rulesEngineKeyProcessBufferCap, rulesEngineWatcherErrors, rulesEngineCrawlerQueryTime, - rulesEngineCrawlerEvalTime, rulesEngineCrawlerValues) + rulesEngineCrawlerEvalTime, rulesEngineCrawlerValues, rulesEngineWorkerCount) } // IncLockMetric increments the lock count. @@ -112,8 +118,8 @@ func WorkerQueueWaitTime(methodName string, startTime time.Time) { } // WorkBufferWaitTime tracks the amount of time a work item was in the work buffer. -func WorkBufferWaitTime(methodName, pattern, workers string, startTime time.Time) { - rulesEngineWorkBufferWaitTime.WithLabelValues(methodName, pattern, workers).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6)) +func WorkBufferWaitTime(methodName, pattern string, startTime time.Time) { + rulesEngineWorkBufferWaitTime.WithLabelValues(methodName, pattern).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6)) } // CallbackWaitTime tracks how much time elapsed between when the rule was evaluated and the callback called. @@ -145,3 +151,8 @@ func CrawlerEvalTime(name string, startTime time.Time) { func CrawlerValuesCount(name string, count int) { rulesEngineCrawlerValues.WithLabelValues(name).Set(float64(count)) } + +// WorkerCount tracks the number of workers +func WorkersCount(name string, count int) { + rulesEngineCrawlerValues.WithLabelValues(name).Set(float64(count)) +} diff --git a/metrics/prometheus_test.go b/metrics/prometheus_test.go index 6b6c7d2..d0c7145 100644 --- a/metrics/prometheus_test.go +++ b/metrics/prometheus_test.go @@ -64,8 +64,8 @@ func TestWokerQueueWaitTime(t *testing.T) { } func TestWorkBufferWaitTime(t *testing.T) { - WorkBufferWaitTime("getKey", "/desired/key/pattern", "10", time.Now()) - checkMetrics(t, `rules_etcd_work_buffer_wait_ms_count{method="getKey",pattern="/desired/key/pattern",workers="10"} 1`) + WorkBufferWaitTime("getKey", "/desired/key/pattern", time.Now()) + checkMetrics(t, `rules_etcd_work_buffer_wait_ms_count{method="getKey",pattern="/desired/key/pattern"} 1`) } func TestCallbackWaitTime(t *testing.T) { diff --git a/rules/engine.go b/rules/engine.go index d351402..4ab964f 100644 --- a/rules/engine.go +++ b/rules/engine.go @@ -11,6 +11,7 @@ import ( "golang.org/x/net/context" "github.com/IBM-Cloud/go-etcd-rules/concurrency" + "github.com/IBM-Cloud/go-etcd-rules/metrics" "github.com/IBM-Cloud/go-etcd-rules/rules/lock" ) @@ -96,7 +97,7 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E ruleMgr := newRuleManager(opts.constraints, opts.enhancedRuleFilter) channel := make(chan v3RuleWork, opts.ruleWorkBuffer) kpChannel := make(chan *keyTask, opts.keyProcBuffer) - keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, opts.concurrency, logger) + keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, logger) baseMetrics := opts.metrics() metrics, ok := baseMetrics.(AdvancedMetricsCollector) @@ -326,6 +327,7 @@ func (e *v3Engine) Run() { go c.run() e.logger.Info("Starting workers", zap.Int("count", e.options.concurrency)) + metrics.WorkersCount(getMethodNameFromProvider(e.options.contextProvider), e.options.concurrency) for i := 0; i < e.options.concurrency; i++ { id := fmt.Sprintf("worker%d", i) w, err := newV3Worker(id, e) diff --git a/rules/key_processor.go b/rules/key_processor.go index cfe6a78..5e7e617 100644 --- a/rules/key_processor.go +++ b/rules/key_processor.go @@ -2,7 +2,6 @@ package rules import ( "fmt" - "strconv" "time" "github.com/IBM-Cloud/go-etcd-rules/metrics" @@ -83,10 +82,10 @@ func (v3kp *v3KeyProcessor) dispatchWork(index int, rule staticRule, logger *zap start := time.Now() v3kp.channel <- work // measures the amount of time work is blocked from being added to the buffer - metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, strconv.Itoa(v3kp.workers), start) + metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, start) } -func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency, workers int, logger *zap.Logger) v3KeyProcessor { +func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency int, logger *zap.Logger) v3KeyProcessor { kp := v3KeyProcessor{ baseKeyProcessor: baseKeyProcessor{ contextProviders: map[int]ContextProvider{}, @@ -98,7 +97,6 @@ func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan channel: channel, kpChannel: kpChannel, lastNotified: -1, - workers: workers, } logger.Info("Starting key processor workers", zap.Int("concurrency", concurrency)) for i := 0; i < concurrency; i++ { diff --git a/rules/key_processor_test.go b/rules/key_processor_test.go index 8d6c3c2..974a5fb 100644 --- a/rules/key_processor_test.go +++ b/rules/key_processor_test.go @@ -91,7 +91,7 @@ func TestNewV3KeyProcessor(t *testing.T) { channel := make(chan v3RuleWork) kpChannel := make(chan *keyTask, 1000) logger := getTestLogger() - kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, 1, logger) + kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, logger) kp.setCallback(0, V3RuleTaskCallback(v3DummyCallback)) kp.setContextProvider(0, defaultContextProvider) kp.setRuleID(0, "testKey")