Skip to content

Commit

Permalink
adding worker count metric'
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviagolden0 committed May 3, 2024
1 parent 55b237f commit 36cd8d4
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
19 changes: 15 additions & 4 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
Namespace: "rules",
Help: "etcd rules engine work buffer wait time in ms",
Buckets: []float64{1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000, 300000, 600000},
}, []string{"method", "pattern", "workers"})
}, []string{"method", "pattern"})
rulesEngineCallbackWaitTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "callback_duration_ms",
Subsystem: "etcd",
Expand Down Expand Up @@ -79,13 +79,19 @@ var (
Namespace: "rules",
Help: "etcd rules engine crawler values count",
}, []string{"name"})
rulesEngineWorkerCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "worker_count",
Subsystem: "etcd",
Namespace: "rules",
Help: "etcd rules engine worker count",
}, []string{"name"})
)

func init() {
prometheus.MustRegister(rulesEngineLockCount, rulesEngineSatisfiedThenNot, rulesEngineEvaluations,
rulesEngineWorkerQueueWait, rulesEngineWorkBufferWaitTime, rulesEngineCallbackWaitTime,
rulesEngineKeyProcessBufferCap, rulesEngineWatcherErrors, rulesEngineCrawlerQueryTime,
rulesEngineCrawlerEvalTime, rulesEngineCrawlerValues)
rulesEngineCrawlerEvalTime, rulesEngineCrawlerValues, rulesEngineWorkerCount)
}

// IncLockMetric increments the lock count.
Expand All @@ -112,8 +118,8 @@ func WorkerQueueWaitTime(methodName string, startTime time.Time) {
}

// WorkBufferWaitTime tracks the amount of time a work item was in the work buffer.
func WorkBufferWaitTime(methodName, pattern, workers string, startTime time.Time) {
rulesEngineWorkBufferWaitTime.WithLabelValues(methodName, pattern, workers).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
func WorkBufferWaitTime(methodName, pattern string, startTime time.Time) {
rulesEngineWorkBufferWaitTime.WithLabelValues(methodName, pattern).Observe(float64(time.Since(startTime).Nanoseconds() / 1e6))
}

// CallbackWaitTime tracks how much time elapsed between when the rule was evaluated and the callback called.
Expand Down Expand Up @@ -145,3 +151,8 @@ func CrawlerEvalTime(name string, startTime time.Time) {
func CrawlerValuesCount(name string, count int) {
rulesEngineCrawlerValues.WithLabelValues(name).Set(float64(count))
}

// WorkerCount tracks the number of workers
func WorkersCount(name string, count int) {
rulesEngineCrawlerValues.WithLabelValues(name).Set(float64(count))
}
4 changes: 2 additions & 2 deletions metrics/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestWokerQueueWaitTime(t *testing.T) {
}

func TestWorkBufferWaitTime(t *testing.T) {
WorkBufferWaitTime("getKey", "/desired/key/pattern", "10", time.Now())
checkMetrics(t, `rules_etcd_work_buffer_wait_ms_count{method="getKey",pattern="/desired/key/pattern",workers="10"} 1`)
WorkBufferWaitTime("getKey", "/desired/key/pattern", time.Now())
checkMetrics(t, `rules_etcd_work_buffer_wait_ms_count{method="getKey",pattern="/desired/key/pattern"} 1`)
}

func TestCallbackWaitTime(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/net/context"

"github.com/IBM-Cloud/go-etcd-rules/concurrency"
"github.com/IBM-Cloud/go-etcd-rules/metrics"
"github.com/IBM-Cloud/go-etcd-rules/rules/lock"
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E
ruleMgr := newRuleManager(opts.constraints, opts.enhancedRuleFilter)
channel := make(chan v3RuleWork, opts.ruleWorkBuffer)
kpChannel := make(chan *keyTask, opts.keyProcBuffer)
keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, opts.concurrency, logger)
keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, logger)

baseMetrics := opts.metrics()
metrics, ok := baseMetrics.(AdvancedMetricsCollector)
Expand Down Expand Up @@ -326,6 +327,7 @@ func (e *v3Engine) Run() {
go c.run()

e.logger.Info("Starting workers", zap.Int("count", e.options.concurrency))
metrics.WorkersCount(getMethodNameFromProvider(e.options.contextProvider), e.options.concurrency)
for i := 0; i < e.options.concurrency; i++ {
id := fmt.Sprintf("worker%d", i)
w, err := newV3Worker(id, e)
Expand Down
6 changes: 2 additions & 4 deletions rules/key_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rules

import (
"fmt"
"strconv"
"time"

"github.com/IBM-Cloud/go-etcd-rules/metrics"
Expand Down Expand Up @@ -83,10 +82,10 @@ func (v3kp *v3KeyProcessor) dispatchWork(index int, rule staticRule, logger *zap
start := time.Now()
v3kp.channel <- work
// measures the amount of time work is blocked from being added to the buffer
metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, strconv.Itoa(v3kp.workers), start)
metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, start)
}

func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency, workers int, logger *zap.Logger) v3KeyProcessor {
func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency int, logger *zap.Logger) v3KeyProcessor {
kp := v3KeyProcessor{
baseKeyProcessor: baseKeyProcessor{
contextProviders: map[int]ContextProvider{},
Expand All @@ -98,7 +97,6 @@ func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan
channel: channel,
kpChannel: kpChannel,
lastNotified: -1,
workers: workers,
}
logger.Info("Starting key processor workers", zap.Int("concurrency", concurrency))
for i := 0; i < concurrency; i++ {
Expand Down
2 changes: 1 addition & 1 deletion rules/key_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestNewV3KeyProcessor(t *testing.T) {
channel := make(chan v3RuleWork)
kpChannel := make(chan *keyTask, 1000)
logger := getTestLogger()
kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, 1, logger)
kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, logger)
kp.setCallback(0, V3RuleTaskCallback(v3DummyCallback))
kp.setContextProvider(0, defaultContextProvider)
kp.setRuleID(0, "testKey")
Expand Down

0 comments on commit 36cd8d4

Please sign in to comment.