From 3d266cd4f11bbcddf1de3b5cbf80793a99e3cb77 Mon Sep 17 00:00:00 2001 From: ShocOne <62835948+ShocOne@users.noreply.github.com> Date: Tue, 23 Apr 2024 14:46:36 +0100 Subject: [PATCH 1/2] Adjust concurrency logic and metrics in the concurrency package, and improve response time monitoring --- concurrency/metrics.go | 48 ++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/concurrency/metrics.go b/concurrency/metrics.go index f3e8f26..de5cc13 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -195,47 +195,59 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) in // A slice to hold the last n response times for averaging var responseTimes []time.Duration +// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment. // MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment. func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int { + ch.Metrics.Lock.Lock() // Ensure thread safety when accessing shared metrics + defer ch.Metrics.Lock.Unlock() + // Append the latest response time responseTimes = append(responseTimes, responseTime) if len(responseTimes) > 10 { // Use the last 10 measurements for a smoother average responseTimes = responseTimes[1:] } - // Calculate moving average response time from the slice - var sum time.Duration - for _, rt := range responseTimes { - sum += rt - } - averageResponseTime := sum / time.Duration(len(responseTimes)) - - // Calculate standard deviation based on the moving average - var varianceSum float64 - for _, rt := range responseTimes { - varianceSum += math.Pow(rt.Seconds()-averageResponseTime.Seconds(), 2) - } - variance := varianceSum / float64(len(responseTimes)) - stdDev := math.Sqrt(variance) + stdDev := calculateStdDev(responseTimes) // Action determination with debounce effect - // Requires keeping track of how often the stdDev threshold has been exceeded - const debounceCount = 3 // e.g., threshold must be exceeded in 3 consecutive checks to act + // Debounce mechanism for scaling down + const debounceCount = 3 // Threshold must be exceeded in 3 consecutive checks to act if stdDev > (ch.Metrics.ResponseTimeVariability.StdDevThreshold * 1.5) { ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount++ + ch.logger.Info("Increased debounce counter", zap.Int("counter", ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount)) if ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount >= debounceCount { ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // reset counter after action - return -1 // Suggest decrease concurrency + ch.logger.Info("Concurrent requests scaling down due to high response time variability") + return -1 // Suggest decrease concurrency } } else { ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // reset counter if condition not met - if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency { + if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < cap(ch.sem) { + ch.logger.Info("Concurrent requests scaling up as conditions are favorable") return 1 // Suggest increase concurrency if there is capacity } } return 0 } +// calculateStdDev computes the standard deviation of response times. +func calculateStdDev(times []time.Duration) float64 { + var sum time.Duration + for _, t := range times { + sum += t + } + avg := sum / time.Duration(len(times)) + + var varianceSum float64 + for _, t := range times { + varianceSum += math.Pow(t.Seconds()-avg.Seconds(), 2) + } + variance := varianceSum / float64(len(times)) + stdDev := math.Sqrt(variance) + + return stdDev +} + // calculateVariance calculates the variance of response times. func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 { // Convert time.Duration values to seconds From 5f49f63e1dfb3eda8a5c4bd067138824be88ac50 Mon Sep 17 00:00:00 2001 From: ShocOne <62835948+ShocOne@users.noreply.github.com> Date: Tue, 23 Apr 2024 14:47:09 +0100 Subject: [PATCH 2/2] Refactor concurrency logic and metrics in the concurrency package --- concurrency/metrics.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/concurrency/metrics.go b/concurrency/metrics.go index de5cc13..6188d07 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -195,7 +195,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) in // A slice to hold the last n response times for averaging var responseTimes []time.Duration -// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment. // MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment. func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int { ch.Metrics.Lock.Lock() // Ensure thread safety when accessing shared metrics @@ -247,15 +246,3 @@ func calculateStdDev(times []time.Duration) float64 { return stdDev } - -// calculateVariance calculates the variance of response times. -func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 { - // Convert time.Duration values to seconds - averageSeconds := averageResponseTime.Seconds() - responseSeconds := responseTime.Seconds() - - // Calculate variance - variance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count) - ch.Metrics.ResponseTimeVariability.Variance = variance - return variance -}