Skip to content

Commit

Permalink
worker count
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviagolden0 committed May 2, 2024
1 parent 8170fbc commit 55b237f
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion rules/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newV3Engine(logger *zap.Logger, cl *v3.Client, options ...EngineOption) v3E
ruleMgr := newRuleManager(opts.constraints, opts.enhancedRuleFilter)
channel := make(chan v3RuleWork, opts.ruleWorkBuffer)
kpChannel := make(chan *keyTask, opts.keyProcBuffer)
keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, logger)
keyProc := newV3KeyProcessor(channel, &ruleMgr, kpChannel, opts.keyProcConcurrency, opts.concurrency, logger)

baseMetrics := opts.metrics()
metrics, ok := baseMetrics.(AdvancedMetricsCollector)
Expand Down
8 changes: 4 additions & 4 deletions rules/key_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type v3KeyProcessor struct {
channel chan v3RuleWork
kpChannel chan *keyTask
lastNotified int
concurrency int
workers int
}

func (v3kp *v3KeyProcessor) setCallback(index int, callback interface{}) {
Expand Down Expand Up @@ -83,10 +83,10 @@ func (v3kp *v3KeyProcessor) dispatchWork(index int, rule staticRule, logger *zap
start := time.Now()
v3kp.channel <- work
// measures the amount of time work is blocked from being added to the buffer
metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, strconv.Itoa(v3kp.concurrency), start)
metrics.WorkBufferWaitTime(getMethodNameFromProvider(work.contextProvider), keyPattern, strconv.Itoa(v3kp.workers), start)
}

func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency int, logger *zap.Logger) v3KeyProcessor {
func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan *keyTask, concurrency, workers int, logger *zap.Logger) v3KeyProcessor {
kp := v3KeyProcessor{
baseKeyProcessor: baseKeyProcessor{
contextProviders: map[int]ContextProvider{},
Expand All @@ -98,7 +98,7 @@ func newV3KeyProcessor(channel chan v3RuleWork, rm *ruleManager, kpChannel chan
channel: channel,
kpChannel: kpChannel,
lastNotified: -1,
concurrency: concurrency,
workers: workers,
}
logger.Info("Starting key processor workers", zap.Int("concurrency", concurrency))
for i := 0; i < concurrency; i++ {
Expand Down
2 changes: 1 addition & 1 deletion rules/key_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestNewV3KeyProcessor(t *testing.T) {
channel := make(chan v3RuleWork)
kpChannel := make(chan *keyTask, 1000)
logger := getTestLogger()
kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, logger)
kp := newV3KeyProcessor(channel, &rm, kpChannel, 1, 1, logger)
kp.setCallback(0, V3RuleTaskCallback(v3DummyCallback))
kp.setContextProvider(0, defaultContextProvider)
kp.setRuleID(0, "testKey")
Expand Down

0 comments on commit 55b237f

Please sign in to comment.